You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/08/28 07:28:33 UTC
[james-project] 02/09: JAMES-2813 remove listener from submit task
to the worker
This is an automated email from the ASF dual-hosted git repository.
rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 4289da532648f9145f585273607d58f09a4a2bbf
Author: RĂ©mi Kowalski <rk...@linagora.com>
AuthorDate: Wed Jul 31 18:00:48 2019 +0200
JAMES-2813 remove listener from submit task to the worker
---
.../org/apache/james/CassandraTaskManagerTest.java | 28 ++++++++-----
.../apache/james/DistributedTaskManagerModule.java | 14 +++++++
.../org/apache/james/task/MemoryTaskManager.java | 43 +++++++++++---------
.../org/apache/james/task/MemoryWorkQueue.java | 16 +++-----
.../apache/james/task/SerialTaskManagerWorker.java | 24 ++++++-----
.../org/apache/james/task/TaskManagerWorker.java | 12 +++---
.../main/java/org/apache/james/task/WorkQueue.java | 2 +-
.../eventsourcing/EventSourcingTaskManager.scala | 47 +++++++++++-----------
.../task/eventsourcing/WorkQueueSupplier.scala} | 23 +++++------
.../task/eventsourcing/WorkerStatusListener.scala | 46 ++++++++++-----------
.../james/task/SerialTaskManagerWorkerTest.java | 42 +++++++++----------
.../EventSourcingTaskManagerTest.java | 10 +++--
12 files changed, 169 insertions(+), 138 deletions(-)
diff --git a/server/container/guice/cassandra-guice/src/test/java/org/apache/james/CassandraTaskManagerTest.java b/server/container/guice/cassandra-guice/src/test/java/org/apache/james/CassandraTaskManagerTest.java
index 82c3f2c..72cd9ff 100644
--- a/server/container/guice/cassandra-guice/src/test/java/org/apache/james/CassandraTaskManagerTest.java
+++ b/server/container/guice/cassandra-guice/src/test/java/org/apache/james/CassandraTaskManagerTest.java
@@ -19,6 +19,11 @@
package org.apache.james;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
import org.apache.james.backends.cassandra.CassandraCluster;
import org.apache.james.backends.cassandra.CassandraClusterExtension;
import org.apache.james.backends.cassandra.components.CassandraModule;
@@ -31,27 +36,26 @@ import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTOModule;
import org.apache.james.server.task.json.JsonTaskSerializer;
import org.apache.james.server.task.json.dto.TestTaskDTOModules;
import org.apache.james.task.CompletedTask;
+import org.apache.james.task.MemoryWorkQueue;
+import org.apache.james.task.SerialTaskManagerWorker;
import org.apache.james.task.TaskExecutionDetails;
import org.apache.james.task.TaskId;
import org.apache.james.task.TaskManager;
+import org.apache.james.task.TaskManagerWorker;
import org.apache.james.task.eventsourcing.EventSourcingTaskManager;
import org.apache.james.task.eventsourcing.TaskExecutionDetailsProjection;
+import org.apache.james.task.eventsourcing.WorkQueueSupplier;
+import org.apache.james.task.eventsourcing.WorkerStatusListener;
import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjection;
import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjectionDAO;
import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjectionModule;
import org.apache.james.task.eventsourcing.cassandra.TasksSerializationModule;
-
-import com.github.steveash.guavate.Guavate;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import static org.assertj.core.api.Assertions.assertThat;
+import com.github.steveash.guavate.Guavate;
class CassandraTaskManagerTest {
private static final JsonTaskSerializer TASK_SERIALIZER = new JsonTaskSerializer(TestTaskDTOModules.COMPLETED_TASK_MODULE);
@@ -73,8 +77,14 @@ class CassandraTaskManagerTest {
CassandraCluster cassandra = cassandraCluster.getCassandraCluster();
CassandraTaskExecutionDetailsProjectionDAO cassandraTaskExecutionDetailsProjectionDAO = new CassandraTaskExecutionDetailsProjectionDAO(cassandra.getConf(), cassandra.getTypesProvider());
TaskExecutionDetailsProjection executionDetailsProjection = new CassandraTaskExecutionDetailsProjection(cassandraTaskExecutionDetailsProjectionDAO);
- TaskManager taskManager1 = new EventSourcingTaskManager(eventStore, executionDetailsProjection);
- TaskManager taskManager2 = new EventSourcingTaskManager(eventStore, executionDetailsProjection);
+
+ WorkQueueSupplier workQueueSupplier = eventSourcingSystem -> {
+ WorkerStatusListener listener = new WorkerStatusListener(eventSourcingSystem);
+ TaskManagerWorker worker = new SerialTaskManagerWorker(listener);
+ return new MemoryWorkQueue(worker);
+ };
+ TaskManager taskManager1 = new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection);
+ TaskManager taskManager2 = new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection);
TaskId taskId = taskManager1.submit(new CompletedTask());
Awaitility.await()
diff --git a/server/container/guice/cassandra-guice/src/test/java/org/apache/james/DistributedTaskManagerModule.java b/server/container/guice/cassandra-guice/src/test/java/org/apache/james/DistributedTaskManagerModule.java
index 7a6391f..6f1a294 100644
--- a/server/container/guice/cassandra-guice/src/test/java/org/apache/james/DistributedTaskManagerModule.java
+++ b/server/container/guice/cassandra-guice/src/test/java/org/apache/james/DistributedTaskManagerModule.java
@@ -20,20 +20,34 @@
package org.apache.james;
+import org.apache.james.task.MemoryWorkQueue;
+import org.apache.james.task.SerialTaskManagerWorker;
import org.apache.james.task.TaskManager;
+import org.apache.james.task.TaskManagerWorker;
import org.apache.james.task.eventsourcing.EventSourcingTaskManager;
import org.apache.james.task.eventsourcing.TaskExecutionDetailsProjection;
+import org.apache.james.task.eventsourcing.WorkQueueSupplier;
+import org.apache.james.task.eventsourcing.WorkerStatusListener;
import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjection;
import com.google.inject.AbstractModule;
import com.google.inject.Scopes;
public class DistributedTaskManagerModule extends AbstractModule {
+
+ public static final WorkQueueSupplier workQueueSupplier = eventSourcingSystem -> {
+ WorkerStatusListener listener = new WorkerStatusListener(eventSourcingSystem);
+ TaskManagerWorker worker = new SerialTaskManagerWorker(listener);
+ return new MemoryWorkQueue(worker);
+ };
+
@Override
protected void configure() {
bind(TaskExecutionDetailsProjection.class).in(Scopes.SINGLETON);
bind(TaskManager.class).in(Scopes.SINGLETON);
+ bind(WorkQueueSupplier.class).in(Scopes.SINGLETON);
bind(TaskExecutionDetailsProjection.class).to(CassandraTaskExecutionDetailsProjection.class);
bind(TaskManager.class).to(EventSourcingTaskManager.class);
+ bind(WorkQueueSupplier.class).toInstance(workQueueSupplier);
}
}
diff --git a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
index ad7bd9d..f972e39 100644
--- a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
+++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
@@ -36,38 +36,45 @@ import reactor.core.scheduler.Schedulers;
public class MemoryTaskManager implements TaskManager {
+ @FunctionalInterface
+ private interface TaskExecutionDetailsUpdaterFactory {
+ Consumer<TaskExecutionDetailsUpdater> apply(TaskId taskId);
+ }
+
private static class DetailsUpdater implements TaskManagerWorker.Listener {
- private final Consumer<TaskExecutionDetailsUpdater> updater;
+ private final TaskExecutionDetailsUpdaterFactory updaterFactory;
- DetailsUpdater(Consumer<TaskExecutionDetailsUpdater> updater) {
- this.updater = updater;
+ DetailsUpdater(TaskExecutionDetailsUpdaterFactory updaterFactory) {
+ this.updaterFactory = updaterFactory;
}
@Override
- public void started() {
- updater.accept(TaskExecutionDetails::started);
+ public void started(TaskId taskId) {
+ updaterFactory.apply(taskId).accept(TaskExecutionDetails::started);
}
@Override
- public void completed(Task.Result result) {
- updater.accept(TaskExecutionDetails::completed);
-
+ public void completed(TaskId taskId, Task.Result result) {
+ updaterFactory.apply(taskId)
+ .accept(TaskExecutionDetails::completed);
}
@Override
- public void failed(Throwable t) {
- failed();
+ public void failed(TaskId taskId, Throwable t) {
+ failed(taskId);
}
@Override
- public void failed() {
- updater.accept(TaskExecutionDetails::failed);
+ public void failed(TaskId taskId) {
+ updaterFactory.apply(taskId)
+ .accept(TaskExecutionDetails::failed);
}
@Override
- public void cancelled() {
- updater.accept(TaskExecutionDetails::cancelEffectively);
+ public void cancelled(TaskId taskId) {
+ updaterFactory.apply(taskId)
+ .accept(TaskExecutionDetails::cancelEffectively);
}
}
@@ -80,7 +87,7 @@ public class MemoryTaskManager implements TaskManager {
public MemoryTaskManager() {
idToExecutionDetails = new ConcurrentHashMap<>();
- worker = new SerialTaskManagerWorker();
+ worker = new SerialTaskManagerWorker(updater());
workQueue = new MemoryWorkQueue(worker);
}
@@ -88,7 +95,7 @@ public class MemoryTaskManager implements TaskManager {
TaskId taskId = TaskId.generateTaskId();
TaskExecutionDetails executionDetails = TaskExecutionDetails.from(task, taskId);
idToExecutionDetails.put(taskId, executionDetails);
- workQueue.submit(new TaskWithId(taskId, task), updater(taskId));
+ workQueue.submit(new TaskWithId(taskId, task));
return taskId;
}
@@ -148,8 +155,8 @@ public class MemoryTaskManager implements TaskManager {
}
}
- private DetailsUpdater updater(TaskId id) {
- return new DetailsUpdater(updateDetails(id));
+ private DetailsUpdater updater() {
+ return new DetailsUpdater(this::updateDetails);
}
private Consumer<TaskExecutionDetailsUpdater> updateDetails(TaskId taskId) {
diff --git a/server/task/src/main/java/org/apache/james/task/MemoryWorkQueue.java b/server/task/src/main/java/org/apache/james/task/MemoryWorkQueue.java
index 8a720e6..cea8b04 100644
--- a/server/task/src/main/java/org/apache/james/task/MemoryWorkQueue.java
+++ b/server/task/src/main/java/org/apache/james/task/MemoryWorkQueue.java
@@ -25,14 +25,12 @@ import java.util.concurrent.LinkedBlockingQueue;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
-import reactor.util.function.Tuple2;
-import reactor.util.function.Tuples;
public class MemoryWorkQueue implements WorkQueue {
private final TaskManagerWorker worker;
private final Disposable subscription;
- private final LinkedBlockingQueue<Tuple2<TaskWithId, TaskManagerWorker.Listener>> tasks;
+ private final LinkedBlockingQueue<TaskWithId> tasks;
public MemoryWorkQueue(TaskManagerWorker worker) {
this.worker = worker;
@@ -44,17 +42,15 @@ public class MemoryWorkQueue implements WorkQueue {
.subscribe();
}
- private Mono<?> dispatchTaskToWorker(Tuple2<TaskWithId, TaskManagerWorker.Listener> tuple) {
- TaskWithId taskWithId = tuple.getT1();
- TaskManagerWorker.Listener listener = tuple.getT2();
- return worker.executeTask(taskWithId, listener);
+ private Mono<?> dispatchTaskToWorker(TaskWithId taskWithId) {
+ return worker.executeTask(taskWithId);
}
- public void submit(TaskWithId taskWithId, TaskManagerWorker.Listener listener) {
+ public void submit(TaskWithId taskWithId) {
try {
- tasks.put(Tuples.of(taskWithId, listener));
+ tasks.put(taskWithId);
} catch (InterruptedException e) {
- listener.cancelled();
+ worker.cancelTask(taskWithId.getId());
}
}
diff --git a/server/task/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java b/server/task/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
index 4736372..afb7a39 100644
--- a/server/task/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
+++ b/server/task/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
@@ -45,19 +45,21 @@ public class SerialTaskManagerWorker implements TaskManagerWorker {
private static final Logger LOGGER = LoggerFactory.getLogger(SerialTaskManagerWorker.class);
private final ExecutorService taskExecutor;
+ private final Listener listener;
private final AtomicReference<Tuple2<TaskId, Future<?>>> runningTask;
private final Semaphore semaphore;
private final Set<TaskId> cancelledTasks;
- public SerialTaskManagerWorker() {
+ public SerialTaskManagerWorker(Listener listener) {
this.taskExecutor = Executors.newSingleThreadExecutor(NamedThreadFactory.withName("task executor"));
+ this.listener = listener;
this.cancelledTasks = Sets.newConcurrentHashSet();
this.runningTask = new AtomicReference<>();
this.semaphore = new Semaphore(1);
}
@Override
- public Mono<Task.Result> executeTask(TaskWithId taskWithId, Listener listener) {
+ public Mono<Task.Result> executeTask(TaskWithId taskWithId) {
return Mono
.using(
acquireSemaphore(taskWithId, listener),
@@ -72,7 +74,7 @@ public class SerialTaskManagerWorker implements TaskManagerWorker {
semaphore.acquire();
return semaphore;
} catch (InterruptedException e) {
- listener.cancelled();
+ listener.cancelled(taskWithId.getId());
throw e;
}
};
@@ -87,14 +89,14 @@ public class SerialTaskManagerWorker implements TaskManagerWorker {
return Mono.fromFuture(future)
.doOnError(exception -> {
if (exception instanceof CancellationException) {
- listener.cancelled();
+ listener.cancelled(taskWithId.getId());
} else {
- listener.failed(exception);
+ listener.failed(taskWithId.getId(), exception);
}
})
.onErrorReturn(Task.Result.PARTIAL);
} else {
- listener.cancelled();
+ listener.cancelled(taskWithId.getId());
return Mono.empty();
}
};
@@ -110,21 +112,21 @@ public class SerialTaskManagerWorker implements TaskManagerWorker {
}
private Task.Result run(TaskWithId taskWithId, Listener listener) {
- listener.started();
+ listener.started(taskWithId.getId());
try {
return taskWithId.getTask()
.run()
- .onComplete(listener::completed)
+ .onComplete(result -> listener.completed(taskWithId.getId(), result))
.onFailure(() -> {
LOGGER.error("Task was partially performed. Check logs for more details. Taskid : " + taskWithId.getId());
- listener.failed();
+ listener.failed(taskWithId.getId());
});
} catch (InterruptedException e) {
- listener.cancelled();
+ listener.cancelled(taskWithId.getId());
return Task.Result.PARTIAL;
} catch (Exception e) {
LOGGER.error("Error while running task {}", taskWithId.getId(), e);
- listener.failed(e);
+ listener.failed(taskWithId.getId(), e);
return Task.Result.PARTIAL;
}
}
diff --git a/server/task/src/main/java/org/apache/james/task/TaskManagerWorker.java b/server/task/src/main/java/org/apache/james/task/TaskManagerWorker.java
index cacc6c8..4780cc7 100644
--- a/server/task/src/main/java/org/apache/james/task/TaskManagerWorker.java
+++ b/server/task/src/main/java/org/apache/james/task/TaskManagerWorker.java
@@ -25,18 +25,18 @@ import reactor.core.publisher.Mono;
public interface TaskManagerWorker extends Closeable {
interface Listener {
- void started();
+ void started(TaskId taskId);
- void completed(Task.Result result);
+ void completed(TaskId taskId, Task.Result result);
- void failed(Throwable t);
+ void failed(TaskId taskId, Throwable t);
- void failed();
+ void failed(TaskId taskId);
- void cancelled();
+ void cancelled(TaskId taskId);
}
- Mono<Task.Result> executeTask(TaskWithId taskWithId, Listener listener);
+ Mono<Task.Result> executeTask(TaskWithId taskWithId);
void cancelTask(TaskId taskId);
}
diff --git a/server/task/src/main/java/org/apache/james/task/WorkQueue.java b/server/task/src/main/java/org/apache/james/task/WorkQueue.java
index 7c8c70a..ae363ff 100644
--- a/server/task/src/main/java/org/apache/james/task/WorkQueue.java
+++ b/server/task/src/main/java/org/apache/james/task/WorkQueue.java
@@ -22,7 +22,7 @@ import java.io.Closeable;
public interface WorkQueue extends Closeable {
- void submit(TaskWithId taskWithId, TaskManagerWorker.Listener listener);
+ void submit(TaskWithId taskWithId);
void cancel(TaskId taskId);
}
diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
index 0569dcc..34f55cf 100644
--- a/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
+++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
@@ -1,21 +1,21 @@
/** **************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- * ***************************************************************/
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ * ***************************************************************/
package org.apache.james.task.eventsourcing
import java.io.Closeable
@@ -23,25 +23,24 @@ import java.util
import com.google.common.annotations.VisibleForTesting
import javax.inject.Inject
-import org.apache.james.eventsourcing.{AggregateId, Subscriber}
import org.apache.james.eventsourcing.eventstore.{EventStore, History}
+import org.apache.james.eventsourcing.{AggregateId, Subscriber}
import org.apache.james.task._
import org.apache.james.task.eventsourcing.TaskCommand._
import scala.annotation.tailrec
class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing](
- val serialTaskManagerWorker: TaskManagerWorker,
- val workQueue: WorkQueue,
- val eventStore: EventStore,
- val executionDetailsProjection: TaskExecutionDetailsProjection) extends TaskManager with Closeable {
+ workQueueSupplier: WorkQueueSupplier,
+ val eventStore: EventStore,
+ val executionDetailsProjection: TaskExecutionDetailsProjection) extends TaskManager with Closeable {
private val delayBetweenPollingInMs = 500
private def workDispatcher: Subscriber = {
case Created(aggregateId, _, task) =>
val taskWithId = new TaskWithId(aggregateId.taskId, task)
- workQueue.submit(taskWithId, new WorkerStatusListener(taskWithId.getId, eventSourcingSystem))
+ workQueue.submit(taskWithId)
case CancelRequested(aggregateId, _) =>
workQueue.cancel(aggregateId.taskId)
case _ =>
@@ -63,6 +62,8 @@ class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing]
workDispatcher),
eventStore = eventStore)
+ private val workQueue: WorkQueue = workQueueSupplier(eventSourcingSystem)
+
override def submit(task: Task): TaskId = {
val taskId = TaskId.generateTaskId
val command = Create(taskId, task)
diff --git a/server/task/src/main/java/org/apache/james/task/WorkQueue.java b/server/task/src/main/scala/org/apache/james/task/eventsourcing/WorkQueueSupplier.scala
similarity index 61%
copy from server/task/src/main/java/org/apache/james/task/WorkQueue.java
copy to server/task/src/main/scala/org/apache/james/task/eventsourcing/WorkQueueSupplier.scala
index 7c8c70a..0265359 100644
--- a/server/task/src/main/java/org/apache/james/task/WorkQueue.java
+++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/WorkQueueSupplier.scala
@@ -1,4 +1,4 @@
-/****************************************************************
+/** **************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
@@ -6,23 +6,22 @@
* to you under the Apache License, Version 2.0 (the *
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
- ****************************************************************/
-package org.apache.james.task;
+ * ***************************************************************/
+package org.apache.james.task.eventsourcing
-import java.io.Closeable;
+import org.apache.james.eventsourcing.EventSourcingSystem
+import org.apache.james.task.WorkQueue
-public interface WorkQueue extends Closeable {
-
- void submit(TaskWithId taskWithId, TaskManagerWorker.Listener listener);
-
- void cancel(TaskId taskId);
+@FunctionalInterface
+trait WorkQueueSupplier {
+ def apply(eventSourcingSystem: EventSourcingSystem): WorkQueue
}
diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala
index 7e1ca87..2b08857 100644
--- a/server/task/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala
+++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala
@@ -1,21 +1,21 @@
/** **************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- * ***************************************************************/
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ * ***************************************************************/
package org.apache.james.task.eventsourcing
@@ -24,15 +24,15 @@ import org.apache.james.task.Task.Result
import org.apache.james.task.eventsourcing.TaskCommand._
import org.apache.james.task.{TaskId, TaskManagerWorker}
-class WorkerStatusListener(taskId: TaskId, eventSourcingSystem: EventSourcingSystem) extends TaskManagerWorker.Listener {
+case class WorkerStatusListener(eventSourcingSystem: EventSourcingSystem) extends TaskManagerWorker.Listener {
- override def started(): Unit = eventSourcingSystem.dispatch(Start(taskId))
+ override def started(taskId: TaskId): Unit = eventSourcingSystem.dispatch(Start(taskId))
- override def completed(result: Result): Unit = eventSourcingSystem.dispatch(Complete(taskId, result))
+ override def completed(taskId: TaskId, result: Result): Unit = eventSourcingSystem.dispatch(Complete(taskId, result))
- override def failed(t: Throwable): Unit = eventSourcingSystem.dispatch(Fail(taskId))
+ override def failed(taskId: TaskId, t: Throwable): Unit = eventSourcingSystem.dispatch(Fail(taskId))
- override def failed(): Unit = eventSourcingSystem.dispatch(Fail(taskId))
+ override def failed(taskId: TaskId): Unit = eventSourcingSystem.dispatch(Fail(taskId))
- override def cancelled(): Unit = eventSourcingSystem.dispatch(Cancel(taskId))
+ override def cancelled(taskId: TaskId): Unit = eventSourcingSystem.dispatch(Cancel(taskId))
}
\ No newline at end of file
diff --git a/server/task/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java b/server/task/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
index 2aa1dc0..522ed0c 100644
--- a/server/task/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
+++ b/server/task/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
@@ -20,6 +20,7 @@ package org.apache.james.task;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
@@ -32,18 +33,25 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
class SerialTaskManagerWorkerTest {
-
- private final SerialTaskManagerWorker worker = new SerialTaskManagerWorker();
+ private TaskManagerWorker.Listener listener;
+ private SerialTaskManagerWorker worker;
private final Task successfulTask = new CompletedTask();
private final Task failedTask = new FailedTask();
private final Task throwingTask = new ThrowingTask();
+ @BeforeEach
+ void beforeEach() {
+ listener = mock(TaskManagerWorker.Listener.class);
+ worker = new SerialTaskManagerWorker(listener);
+ }
+
@AfterEach
void tearDown() throws IOException {
worker.close();
@@ -53,35 +61,31 @@ class SerialTaskManagerWorkerTest {
void aSuccessfullTaskShouldCompleteSuccessfully() {
TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), this.successfulTask);
- TaskManagerWorker.Listener listener = mock(TaskManagerWorker.Listener.class);
-
- Mono<Task.Result> result = worker.executeTask(taskWithId, listener);
+ Mono<Task.Result> result = worker.executeTask(taskWithId);
assertThat(result.block()).isEqualTo(Task.Result.COMPLETED);
- verify(listener, atLeastOnce()).completed(Task.Result.COMPLETED);
+ verify(listener, atLeastOnce()).completed(taskWithId.getId(), Task.Result.COMPLETED);
}
@Test
void aFailedTaskShouldCompleteWithFailedStatus() {
TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), failedTask);
- TaskManagerWorker.Listener listener = mock(TaskManagerWorker.Listener.class);
- Mono<Task.Result> result = worker.executeTask(taskWithId, listener);
+ Mono<Task.Result> result = worker.executeTask(taskWithId);
assertThat(result.block()).isEqualTo(Task.Result.PARTIAL);
- verify(listener, atLeastOnce()).failed();
+ verify(listener, atLeastOnce()).failed(taskWithId.getId());
}
@Test
void aThrowingTaskShouldCompleteWithFailedStatus() {
TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), throwingTask);
- TaskManagerWorker.Listener listener = mock(TaskManagerWorker.Listener.class);
- Mono<Task.Result> result = worker.executeTask(taskWithId, listener);
+ Mono<Task.Result> result = worker.executeTask(taskWithId);
assertThat(result.block()).isEqualTo(Task.Result.PARTIAL);
- verify(listener, atLeastOnce()).failed(any(RuntimeException.class));
+ verify(listener, atLeastOnce()).failed(eq(taskWithId.getId()), any(RuntimeException.class));
}
@Test
@@ -98,12 +102,10 @@ class SerialTaskManagerWorkerTest {
TaskWithId taskWithId = new TaskWithId(id, inProgressTask);
- TaskManagerWorker.Listener listener = mock(TaskManagerWorker.Listener.class);
-
- worker.executeTask(taskWithId, listener).subscribe();
+ worker.executeTask(taskWithId).subscribe();
await(taskLaunched);
- verify(listener, atLeastOnce()).started();
+ verify(listener, atLeastOnce()).started(id);
verifyNoMoreInteractions(listener);
latch.countDown();
}
@@ -122,19 +124,17 @@ class SerialTaskManagerWorkerTest {
TaskWithId taskWithId = new TaskWithId(id, inProgressTask);
- TaskManagerWorker.Listener listener = mock(TaskManagerWorker.Listener.class);
-
- Mono<Task.Result> resultMono = worker.executeTask(taskWithId, listener).cache();
+ Mono<Task.Result> resultMono = worker.executeTask(taskWithId).cache();
resultMono.subscribe();
Awaitility.waitAtMost(org.awaitility.Duration.TEN_SECONDS)
- .untilAsserted(() -> verify(listener, atLeastOnce()).started());
+ .untilAsserted(() -> verify(listener, atLeastOnce()).started(id));
worker.cancelTask(id);
resultMono.block(Duration.ofSeconds(10));
- verify(listener, atLeastOnce()).cancelled();
+ verify(listener, atLeastOnce()).cancelled(id);
verifyNoMoreInteractions(listener);
}
diff --git a/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java b/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
index 057462d..63c62b0 100644
--- a/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
+++ b/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
@@ -27,7 +27,6 @@ import org.apache.james.task.SerialTaskManagerWorker;
import org.apache.james.task.TaskManager;
import org.apache.james.task.TaskManagerContract;
import org.apache.james.task.TaskManagerWorker;
-import org.apache.james.task.WorkQueue;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -39,11 +38,14 @@ class EventSourcingTaskManagerTest implements TaskManagerContract {
@BeforeEach
void setUp() {
- TaskManagerWorker worker = new SerialTaskManagerWorker();
- WorkQueue workQueue = new MemoryWorkQueue(worker);
EventStore eventStore = new InMemoryEventStore();
TaskExecutionDetailsProjection executionDetailsProjection = new MemoryTaskExecutionDetailsProjection();
- taskManager = new EventSourcingTaskManager(worker, workQueue, eventStore, executionDetailsProjection);
+ WorkQueueSupplier workQueueSupplier = eventSourcingSystem -> {
+ WorkerStatusListener listener = new WorkerStatusListener(eventSourcingSystem);
+ TaskManagerWorker worker = new SerialTaskManagerWorker(listener);
+ return new MemoryWorkQueue(worker);
+ };
+ taskManager = new EventSourcingTaskManager(workQueueSupplier, eventStore, executionDetailsProjection);
}
@AfterEach
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org