You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/08/01 07:22:27 UTC
[james-project] 05/09: JAMES-2813 Distribute
EventSourcingTaskManager projection with Cassandra
This is an automated email from the ASF dual-hosted git repository.
rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 2d1de55ffd2cfd8047091b4fefe913212999e9b5
Author: Gautier DI FOLCO <gd...@linagora.com>
AuthorDate: Mon Jul 29 18:02:04 2019 +0200
JAMES-2813 Distribute EventSourcingTaskManager projection with Cassandra
---
.../cassandra/CassandraEventStoreExtension.java | 8 +-
.../org/apache/james/CassandraTaskManagerTest.java | 34 ++++--
.../apache/james/DistributedTaskManagerModule.java | 8 +-
.../CassandraTaskExecutionDetailsProjection.scala | 39 +++++++
...assandraTaskExecutionDetailsProjectionTest.java | 114 +++++++++++++++++++++
.../eventsourcing/EventSourcingTaskManager.scala | 12 +--
.../task/eventsourcing/RecentTasksProjection.scala | 46 ---------
.../TaskExecutionDetailsProjection.scala | 14 ++-
.../EventSourcingTaskManagerTest.java | 3 +-
9 files changed, 201 insertions(+), 77 deletions(-)
diff --git a/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreExtension.java b/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreExtension.java
index dfebb5f..a044d19 100644
--- a/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreExtension.java
+++ b/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreExtension.java
@@ -43,8 +43,12 @@ public class CassandraEventStoreExtension implements BeforeAllCallback, AfterAll
private EventStoreDao eventStoreDao;
public CassandraEventStoreExtension(@SuppressWarnings("rawtypes") EventDTOModule... modules) {
- this.modules = Arrays.stream(modules).collect(ImmutableSet.toImmutableSet());
- this.cassandra = new CassandraClusterExtension(CassandraEventStoreModule.MODULE);
+ this(new CassandraClusterExtension(CassandraEventStoreModule.MODULE), ImmutableSet.copyOf(modules));
+ }
+
+ public CassandraEventStoreExtension(CassandraClusterExtension cassandra, Set<EventDTOModule> module) {
+ this.cassandra = cassandra;
+ this.modules = module;
}
@Override
diff --git a/server/container/guice/guice-common/src/test/java/org/apache/james/CassandraTaskManagerTest.java b/server/container/guice/guice-common/src/test/java/org/apache/james/CassandraTaskManagerTest.java
index 6ce0725..82c3f2c 100644
--- a/server/container/guice/guice-common/src/test/java/org/apache/james/CassandraTaskManagerTest.java
+++ b/server/container/guice/guice-common/src/test/java/org/apache/james/CassandraTaskManagerTest.java
@@ -19,8 +19,14 @@
package org.apache.james;
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.backends.cassandra.init.CassandraZonedDateTimeModule;
+import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
import org.apache.james.eventsourcing.eventstore.EventStore;
import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreExtension;
+import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreModule;
import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTOModule;
import org.apache.james.server.task.json.JsonTaskSerializer;
import org.apache.james.server.task.json.dto.TestTaskDTOModules;
@@ -29,18 +35,20 @@ import org.apache.james.task.TaskExecutionDetails;
import org.apache.james.task.TaskId;
import org.apache.james.task.TaskManager;
import org.apache.james.task.eventsourcing.EventSourcingTaskManager;
-import org.apache.james.task.eventsourcing.MemoryRecentTasksProjection;
-import org.apache.james.task.eventsourcing.MemoryTaskExecutionDetailsProjection;
-import org.apache.james.task.eventsourcing.RecentTasksProjection;
import org.apache.james.task.eventsourcing.TaskExecutionDetailsProjection;
+import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjection;
+import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjectionDAO;
+import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjectionModule;
import org.apache.james.task.eventsourcing.cassandra.TasksSerializationModule;
+import com.github.steveash.guavate.Guavate;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import java.util.List;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import static org.assertj.core.api.Assertions.assertThat;
@@ -48,17 +56,25 @@ import static org.assertj.core.api.Assertions.assertThat;
class CassandraTaskManagerTest {
private static final JsonTaskSerializer TASK_SERIALIZER = new JsonTaskSerializer(TestTaskDTOModules.COMPLETED_TASK_MODULE);
- private static final List<EventDTOModule<?, ?>> MODULES = TasksSerializationModule.MODULES.apply(TASK_SERIALIZER);
+ private static final Set<EventDTOModule> MODULES = TasksSerializationModule.MODULES.apply(TASK_SERIALIZER).stream().collect(Guavate.toImmutableSet());
+
+ static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(
+ CassandraModule.aggregateModules(
+ CassandraSchemaVersionModule.MODULE,
+ CassandraEventStoreModule.MODULE,
+ CassandraZonedDateTimeModule.MODULE,
+ CassandraTaskExecutionDetailsProjectionModule.MODULE()));
@RegisterExtension
- static CassandraEventStoreExtension eventStoreExtension = new CassandraEventStoreExtension(MODULES.stream().toArray(EventDTOModule[]::new));
+ static CassandraEventStoreExtension eventStoreExtension = new CassandraEventStoreExtension(cassandraCluster, MODULES);
@Test
void givenOneEventStoreTwoEventTaskManagersShareTheSameEvents(EventStore eventStore) {
- RecentTasksProjection recentTasksProjection = new MemoryRecentTasksProjection();
- TaskExecutionDetailsProjection executionDetailsProjection = new MemoryTaskExecutionDetailsProjection();
- TaskManager taskManager1 = new EventSourcingTaskManager(eventStore, executionDetailsProjection, recentTasksProjection);
- TaskManager taskManager2 = new EventSourcingTaskManager(eventStore, executionDetailsProjection, recentTasksProjection);
+ CassandraCluster cassandra = cassandraCluster.getCassandraCluster();
+ CassandraTaskExecutionDetailsProjectionDAO cassandraTaskExecutionDetailsProjectionDAO = new CassandraTaskExecutionDetailsProjectionDAO(cassandra.getConf(), cassandra.getTypesProvider());
+ TaskExecutionDetailsProjection executionDetailsProjection = new CassandraTaskExecutionDetailsProjection(cassandraTaskExecutionDetailsProjectionDAO);
+ TaskManager taskManager1 = new EventSourcingTaskManager(eventStore, executionDetailsProjection);
+ TaskManager taskManager2 = new EventSourcingTaskManager(eventStore, executionDetailsProjection);
TaskId taskId = taskManager1.submit(new CompletedTask());
Awaitility.await()
diff --git a/server/container/guice/guice-common/src/test/java/org/apache/james/DistributedTaskManagerModule.java b/server/container/guice/guice-common/src/test/java/org/apache/james/DistributedTaskManagerModule.java
index 7e6fc6d..7a6391f 100644
--- a/server/container/guice/guice-common/src/test/java/org/apache/james/DistributedTaskManagerModule.java
+++ b/server/container/guice/guice-common/src/test/java/org/apache/james/DistributedTaskManagerModule.java
@@ -22,10 +22,8 @@ package org.apache.james;
import org.apache.james.task.TaskManager;
import org.apache.james.task.eventsourcing.EventSourcingTaskManager;
-import org.apache.james.task.eventsourcing.MemoryRecentTasksProjection;
-import org.apache.james.task.eventsourcing.MemoryTaskExecutionDetailsProjection;
-import org.apache.james.task.eventsourcing.RecentTasksProjection;
import org.apache.james.task.eventsourcing.TaskExecutionDetailsProjection;
+import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjection;
import com.google.inject.AbstractModule;
import com.google.inject.Scopes;
@@ -33,11 +31,9 @@ import com.google.inject.Scopes;
public class DistributedTaskManagerModule extends AbstractModule {
@Override
protected void configure() {
- bind(RecentTasksProjection.class).in(Scopes.SINGLETON);
bind(TaskExecutionDetailsProjection.class).in(Scopes.SINGLETON);
bind(TaskManager.class).in(Scopes.SINGLETON);
- bind(RecentTasksProjection.class).to(MemoryRecentTasksProjection.class);
- bind(TaskExecutionDetailsProjection.class).to(MemoryTaskExecutionDetailsProjection.class);
+ bind(TaskExecutionDetailsProjection.class).to(CassandraTaskExecutionDetailsProjection.class);
bind(TaskManager.class).to(EventSourcingTaskManager.class);
}
}
diff --git a/server/task-cassandra/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjection.scala b/server/task-cassandra/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjection.scala
new file mode 100644
index 0000000..f621c44
--- /dev/null
+++ b/server/task-cassandra/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjection.scala
@@ -0,0 +1,39 @@
+/** **************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ * ***************************************************************/
+
+package org.apache.james.task.eventsourcing.cassandra
+
+import javax.inject.Inject
+import scala.compat.java8.OptionConverters._
+import collection.JavaConverters._
+import org.apache.james.task.{TaskExecutionDetails, TaskId}
+import org.apache.james.task.eventsourcing.TaskExecutionDetailsProjection
+
+@Inject
+class CassandraTaskExecutionDetailsProjection(cassandraTaskExecutionDetailsProjectionDAO: CassandraTaskExecutionDetailsProjectionDAO) extends TaskExecutionDetailsProjection {
+
+ override def load(taskId: TaskId): Option[TaskExecutionDetails] =
+ cassandraTaskExecutionDetailsProjectionDAO.readDetails(taskId).blockOptional().asScala
+
+ override def list: List[TaskExecutionDetails] =
+ cassandraTaskExecutionDetailsProjectionDAO.listDetails().collectList().block().asScala.toList
+
+ override def update(details: TaskExecutionDetails): Unit =
+ cassandraTaskExecutionDetailsProjectionDAO.saveDetails(details).block()
+}
diff --git a/server/task-cassandra/src/test/java/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionTest.java b/server/task-cassandra/src/test/java/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionTest.java
new file mode 100644
index 0000000..0c1743a
--- /dev/null
+++ b/server/task-cassandra/src/test/java/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionTest.java
@@ -0,0 +1,114 @@
+/** **************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ * ***************************************************************/
+
+package org.apache.james.task.eventsourcing.cassandra;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.backends.cassandra.init.CassandraZonedDateTimeModule;
+import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
+import org.apache.james.task.TaskExecutionDetails;
+import org.apache.james.task.TaskId;
+import org.apache.james.task.TaskManager;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import scala.collection.JavaConverters;
+import scala.compat.java8.OptionConverters;
+
+class CassandraTaskExecutionDetailsProjectionTest {
+
+ private static final TaskId TASK_ID = TaskId.fromString("2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd");
+ private static final TaskId TASK_ID_2 = TaskId.fromString("2c7f4081-aa30-11e9-bf6c-2d3b9e84aafe");
+
+ private static TaskExecutionDetails TASK_EXECUTION_DETAILS = new TaskExecutionDetails(TASK_ID, "type", Optional.empty(),
+ TaskManager.Status.COMPLETED, Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty());
+ private static TaskExecutionDetails TASK_EXECUTION_DETAILS_2 = new TaskExecutionDetails(TASK_ID_2, "type", Optional.empty(),
+ TaskManager.Status.COMPLETED, Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty());
+ private static TaskExecutionDetails TASK_EXECUTION_DETAILS_UPDATED = new TaskExecutionDetails(TASK_ID, "type", Optional.empty(),
+ TaskManager.Status.FAILED, Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty());
+
+ @RegisterExtension
+ static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(
+ CassandraModule.aggregateModules(CassandraSchemaVersionModule.MODULE, CassandraZonedDateTimeModule.MODULE, CassandraTaskExecutionDetailsProjectionModule.MODULE()));
+
+ private CassandraTaskExecutionDetailsProjection testee;
+
+ @BeforeEach
+ void setUp(CassandraCluster cassandra) {
+ CassandraTaskExecutionDetailsProjectionDAO cassandraTaskExecutionDetailsProjectionDAO = new CassandraTaskExecutionDetailsProjectionDAO(cassandra.getConf(), cassandra.getTypesProvider());
+ testee = new CassandraTaskExecutionDetailsProjection(cassandraTaskExecutionDetailsProjectionDAO);
+ }
+
+ @Test
+ void loadShouldBeAbleToRetrieveASavedRecord() {
+ testee.update(TASK_EXECUTION_DETAILS);
+
+ Optional<TaskExecutionDetails> taskExecutionDetails = OptionConverters.toJava(testee.load(TASK_ID));
+ assertThat(taskExecutionDetails).contains(TASK_EXECUTION_DETAILS);
+ }
+
+ @Test
+ void updateShouldUpdateRecords() {
+ testee.update(TASK_EXECUTION_DETAILS);
+
+ testee.update(TASK_EXECUTION_DETAILS_UPDATED);
+
+ Optional<TaskExecutionDetails> taskExecutionDetails = OptionConverters.toJava(testee.load(TASK_ID));
+ assertThat(taskExecutionDetails).contains(TASK_EXECUTION_DETAILS_UPDATED);
+ }
+
+ @Test
+ void loadShouldReturnEmptyWhenNone() {
+ Optional<TaskExecutionDetails> taskExecutionDetails = OptionConverters.toJava(testee.load(TASK_ID));
+ assertThat(taskExecutionDetails).isEmpty();
+ }
+
+ @Test
+ void listShouldReturnEmptyWhenNone() {
+ List<TaskExecutionDetails> taskExecutionDetails = JavaConverters.asJava(testee.list());
+ assertThat(taskExecutionDetails).isEmpty();
+ }
+
+ @Test
+ void listShouldReturnAllRecords() {
+ testee.update(TASK_EXECUTION_DETAILS);
+ testee.update(TASK_EXECUTION_DETAILS_2);
+
+ List<TaskExecutionDetails> taskExecutionDetails = JavaConverters.asJava(testee.list());
+ assertThat(taskExecutionDetails).containsOnly(TASK_EXECUTION_DETAILS, TASK_EXECUTION_DETAILS_2);
+ }
+
+ @Test
+ void listDetailsShouldReturnLastUpdatedRecords() {
+ testee.update(TASK_EXECUTION_DETAILS);
+ testee.update(TASK_EXECUTION_DETAILS_UPDATED);
+
+ List<TaskExecutionDetails> taskExecutionDetails = JavaConverters.asJava(testee.list());
+ assertThat(taskExecutionDetails).containsOnly(TASK_EXECUTION_DETAILS_UPDATED);
+ }
+}
diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
index d5c8d42..17c9ebd 100644
--- a/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
+++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
@@ -31,8 +31,7 @@ import org.apache.james.task.eventsourcing.TaskCommand._
import scala.annotation.tailrec
class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing](val eventStore: EventStore,
- val executionDetailsProjection: TaskExecutionDetailsProjection,
- val recentTasksProjection: RecentTasksProjection) extends TaskManager with Closeable {
+ val executionDetailsProjection: TaskExecutionDetailsProjection) extends TaskManager with Closeable {
private val workQueue: WorkQueue = WorkQueue.builder().worker(new SerialTaskManagerWorker)
private val delayBetweenPollingInMs = 500
@@ -59,8 +58,7 @@ class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing]
new FailCommandHandler(loadHistory)),
subscribers = Set(
executionDetailsProjection.asSubscriber,
- workDispatcher,
- recentTasksProjection.asSubscriber),
+ workDispatcher),
eventStore = eventStore)
override def submit(task: Task): TaskId = {
@@ -79,9 +77,9 @@ class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing]
.filter(details => details.getStatus == status)
.asJava
- private def listScala: List[TaskExecutionDetails] = recentTasksProjection
- .list()
- .flatMap(executionDetailsProjection.load)
+ private def listScala: List[TaskExecutionDetails] = executionDetailsProjection
+ .list
+ .flatMap(details => executionDetailsProjection.load(details.taskId))
override def cancel(id: TaskId): Unit = {
val command = RequestCancel(id)
diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/RecentTasksProjection.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/RecentTasksProjection.scala
deleted file mode 100644
index e908e90..0000000
--- a/server/task/src/main/scala/org/apache/james/task/eventsourcing/RecentTasksProjection.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/** **************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- * ***************************************************************/
-package org.apache.james.task.eventsourcing
-
-import java.util.concurrent.ConcurrentLinkedDeque
-
-import org.apache.james.eventsourcing.Subscriber
-import org.apache.james.task.TaskId
-
-trait RecentTasksProjection {
- def list(): List[TaskId]
-
- def asSubscriber: Subscriber = {
- case Created(aggregateId, _, _) => add(aggregateId.taskId)
- case _ =>
- }
-
- def add(taskId: TaskId): Unit
-}
-
-class MemoryRecentTasksProjection() extends RecentTasksProjection {
-
- import scala.collection.JavaConverters._
-
- private val tasks = new ConcurrentLinkedDeque[TaskId]
-
- override def list(): List[TaskId] = tasks.asScala.toList
-
- override def add(taskId: TaskId): Unit = tasks.add(taskId)
-}
diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala
index 18c65dc..918a5e0 100644
--- a/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala
+++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala
@@ -22,11 +22,12 @@ import java.util.concurrent.ConcurrentHashMap
import org.apache.james.eventsourcing.Subscriber
import org.apache.james.task.{TaskExecutionDetails, TaskId}
+import collection.JavaConverters._
trait TaskExecutionDetailsProjection {
val asSubscriber: Subscriber = {
case created: Created =>
- update(created.getAggregateId.taskId, TaskExecutionDetails.from(created.task, created.aggregateId.taskId))
+ update(TaskExecutionDetails.from(created.task, created.aggregateId.taskId))
case cancelRequested: CancelRequested =>
update(cancelRequested.aggregateId.taskId)(_.cancelRequested)
case started: Started =>
@@ -42,10 +43,11 @@ trait TaskExecutionDetailsProjection {
private def update(taskId: TaskId)(updater: TaskExecutionDetails => TaskExecutionDetails): Unit =
load(taskId)
.map(updater)
- .foreach(update(taskId, _))
+ .foreach(update)
def load(taskId: TaskId): Option[TaskExecutionDetails]
- def update(taskId: TaskId, details: TaskExecutionDetails): Unit
+ def list: List[TaskExecutionDetails]
+ def update(details: TaskExecutionDetails): Unit
}
class MemoryTaskExecutionDetailsProjection() extends TaskExecutionDetailsProjection {
@@ -53,5 +55,7 @@ class MemoryTaskExecutionDetailsProjection() extends TaskExecutionDetailsProject
override def load(taskId: TaskId): Option[TaskExecutionDetails] = Option(this.details.get(taskId))
- override def update(taskId: TaskId, details: TaskExecutionDetails): Unit = this.details.put(taskId, details)
-}
\ No newline at end of file
+ override def list: List[TaskExecutionDetails] = this.details.values().asScala.toList
+
+ override def update(details: TaskExecutionDetails): Unit = this.details.put(details.taskId, details)
+}
diff --git a/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java b/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
index f904b3ed..b017721 100644
--- a/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
+++ b/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
@@ -36,9 +36,8 @@ class EventSourcingTaskManagerTest implements TaskManagerContract {
@BeforeEach
void setUp() {
EventStore eventStore = new InMemoryEventStore();
- RecentTasksProjection recentTasksProjection = new MemoryRecentTasksProjection();
TaskExecutionDetailsProjection executionDetailsProjection = new MemoryTaskExecutionDetailsProjection();
- taskManager = new EventSourcingTaskManager(eventStore, executionDetailsProjection, recentTasksProjection);
+ taskManager = new EventSourcingTaskManager(eventStore, executionDetailsProjection);
}
@AfterEach
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org