You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/08/01 07:22:27 UTC

[james-project] 05/09: JAMES-2813 Distribute EventSourcingTaskManager projection with Cassandra

This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 2d1de55ffd2cfd8047091b4fefe913212999e9b5
Author: Gautier DI FOLCO <gd...@linagora.com>
AuthorDate: Mon Jul 29 18:02:04 2019 +0200

    JAMES-2813 Distribute EventSourcingTaskManager projection with Cassandra
---
 .../cassandra/CassandraEventStoreExtension.java    |   8 +-
 .../org/apache/james/CassandraTaskManagerTest.java |  34 ++++--
 .../apache/james/DistributedTaskManagerModule.java |   8 +-
 .../CassandraTaskExecutionDetailsProjection.scala  |  39 +++++++
 ...assandraTaskExecutionDetailsProjectionTest.java | 114 +++++++++++++++++++++
 .../eventsourcing/EventSourcingTaskManager.scala   |  12 +--
 .../task/eventsourcing/RecentTasksProjection.scala |  46 ---------
 .../TaskExecutionDetailsProjection.scala           |  14 ++-
 .../EventSourcingTaskManagerTest.java              |   3 +-
 9 files changed, 201 insertions(+), 77 deletions(-)

diff --git a/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreExtension.java b/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreExtension.java
index dfebb5f..a044d19 100644
--- a/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreExtension.java
+++ b/event-sourcing/event-store-cassandra/src/test/java/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreExtension.java
@@ -43,8 +43,12 @@ public class CassandraEventStoreExtension implements BeforeAllCallback, AfterAll
     private EventStoreDao eventStoreDao;
 
     public CassandraEventStoreExtension(@SuppressWarnings("rawtypes") EventDTOModule... modules) {
-        this.modules = Arrays.stream(modules).collect(ImmutableSet.toImmutableSet());
-        this.cassandra = new CassandraClusterExtension(CassandraEventStoreModule.MODULE);
+        this(new CassandraClusterExtension(CassandraEventStoreModule.MODULE), ImmutableSet.copyOf(modules));
+    }
+
+    public CassandraEventStoreExtension(CassandraClusterExtension cassandra, Set<EventDTOModule> module) {
+        this.cassandra = cassandra;
+        this.modules = module;
     }
 
     @Override
diff --git a/server/container/guice/guice-common/src/test/java/org/apache/james/CassandraTaskManagerTest.java b/server/container/guice/guice-common/src/test/java/org/apache/james/CassandraTaskManagerTest.java
index 6ce0725..82c3f2c 100644
--- a/server/container/guice/guice-common/src/test/java/org/apache/james/CassandraTaskManagerTest.java
+++ b/server/container/guice/guice-common/src/test/java/org/apache/james/CassandraTaskManagerTest.java
@@ -19,8 +19,14 @@
 
 package org.apache.james;
 
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.backends.cassandra.init.CassandraZonedDateTimeModule;
+import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
 import org.apache.james.eventsourcing.eventstore.EventStore;
 import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreExtension;
+import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreModule;
 import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTOModule;
 import org.apache.james.server.task.json.JsonTaskSerializer;
 import org.apache.james.server.task.json.dto.TestTaskDTOModules;
@@ -29,18 +35,20 @@ import org.apache.james.task.TaskExecutionDetails;
 import org.apache.james.task.TaskId;
 import org.apache.james.task.TaskManager;
 import org.apache.james.task.eventsourcing.EventSourcingTaskManager;
-import org.apache.james.task.eventsourcing.MemoryRecentTasksProjection;
-import org.apache.james.task.eventsourcing.MemoryTaskExecutionDetailsProjection;
-import org.apache.james.task.eventsourcing.RecentTasksProjection;
 import org.apache.james.task.eventsourcing.TaskExecutionDetailsProjection;
+import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjection;
+import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjectionDAO;
+import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjectionModule;
 import org.apache.james.task.eventsourcing.cassandra.TasksSerializationModule;
 
+import com.github.steveash.guavate.Guavate;
 import org.awaitility.Awaitility;
 import org.awaitility.Duration;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -48,17 +56,25 @@ import static org.assertj.core.api.Assertions.assertThat;
 class CassandraTaskManagerTest {
     private static final JsonTaskSerializer TASK_SERIALIZER = new JsonTaskSerializer(TestTaskDTOModules.COMPLETED_TASK_MODULE);
 
-    private static final List<EventDTOModule<?, ?>> MODULES = TasksSerializationModule.MODULES.apply(TASK_SERIALIZER);
+    private static final Set<EventDTOModule> MODULES = TasksSerializationModule.MODULES.apply(TASK_SERIALIZER).stream().collect(Guavate.toImmutableSet());
+
+    static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(
+        CassandraModule.aggregateModules(
+            CassandraSchemaVersionModule.MODULE,
+            CassandraEventStoreModule.MODULE,
+            CassandraZonedDateTimeModule.MODULE,
+            CassandraTaskExecutionDetailsProjectionModule.MODULE()));
 
     @RegisterExtension
-    static CassandraEventStoreExtension eventStoreExtension = new CassandraEventStoreExtension(MODULES.stream().toArray(EventDTOModule[]::new));
+    static CassandraEventStoreExtension eventStoreExtension = new CassandraEventStoreExtension(cassandraCluster, MODULES);
 
     @Test
     void givenOneEventStoreTwoEventTaskManagersShareTheSameEvents(EventStore eventStore) {
-        RecentTasksProjection recentTasksProjection = new MemoryRecentTasksProjection();
-        TaskExecutionDetailsProjection executionDetailsProjection = new MemoryTaskExecutionDetailsProjection();
-        TaskManager taskManager1 = new EventSourcingTaskManager(eventStore, executionDetailsProjection, recentTasksProjection);
-        TaskManager taskManager2 = new EventSourcingTaskManager(eventStore, executionDetailsProjection, recentTasksProjection);
+        CassandraCluster cassandra = cassandraCluster.getCassandraCluster();
+        CassandraTaskExecutionDetailsProjectionDAO cassandraTaskExecutionDetailsProjectionDAO = new CassandraTaskExecutionDetailsProjectionDAO(cassandra.getConf(), cassandra.getTypesProvider());
+        TaskExecutionDetailsProjection executionDetailsProjection = new CassandraTaskExecutionDetailsProjection(cassandraTaskExecutionDetailsProjectionDAO);
+        TaskManager taskManager1 = new EventSourcingTaskManager(eventStore, executionDetailsProjection);
+        TaskManager taskManager2 = new EventSourcingTaskManager(eventStore, executionDetailsProjection);
 
         TaskId taskId = taskManager1.submit(new CompletedTask());
         Awaitility.await()
diff --git a/server/container/guice/guice-common/src/test/java/org/apache/james/DistributedTaskManagerModule.java b/server/container/guice/guice-common/src/test/java/org/apache/james/DistributedTaskManagerModule.java
index 7e6fc6d..7a6391f 100644
--- a/server/container/guice/guice-common/src/test/java/org/apache/james/DistributedTaskManagerModule.java
+++ b/server/container/guice/guice-common/src/test/java/org/apache/james/DistributedTaskManagerModule.java
@@ -22,10 +22,8 @@ package org.apache.james;
 
 import org.apache.james.task.TaskManager;
 import org.apache.james.task.eventsourcing.EventSourcingTaskManager;
-import org.apache.james.task.eventsourcing.MemoryRecentTasksProjection;
-import org.apache.james.task.eventsourcing.MemoryTaskExecutionDetailsProjection;
-import org.apache.james.task.eventsourcing.RecentTasksProjection;
 import org.apache.james.task.eventsourcing.TaskExecutionDetailsProjection;
+import org.apache.james.task.eventsourcing.cassandra.CassandraTaskExecutionDetailsProjection;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Scopes;
@@ -33,11 +31,9 @@ import com.google.inject.Scopes;
 public class DistributedTaskManagerModule extends AbstractModule {
     @Override
     protected void configure() {
-        bind(RecentTasksProjection.class).in(Scopes.SINGLETON);
         bind(TaskExecutionDetailsProjection.class).in(Scopes.SINGLETON);
         bind(TaskManager.class).in(Scopes.SINGLETON);
-        bind(RecentTasksProjection.class).to(MemoryRecentTasksProjection.class);
-        bind(TaskExecutionDetailsProjection.class).to(MemoryTaskExecutionDetailsProjection.class);
+        bind(TaskExecutionDetailsProjection.class).to(CassandraTaskExecutionDetailsProjection.class);
         bind(TaskManager.class).to(EventSourcingTaskManager.class);
     }
 }
diff --git a/server/task-cassandra/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjection.scala b/server/task-cassandra/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjection.scala
new file mode 100644
index 0000000..f621c44
--- /dev/null
+++ b/server/task-cassandra/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjection.scala
@@ -0,0 +1,39 @@
+/** **************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0                   *
+ * *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ * ***************************************************************/
+
+package org.apache.james.task.eventsourcing.cassandra
+
+import javax.inject.Inject
+import scala.compat.java8.OptionConverters._
+import collection.JavaConverters._
+import org.apache.james.task.{TaskExecutionDetails, TaskId}
+import org.apache.james.task.eventsourcing.TaskExecutionDetailsProjection
+
+@Inject
+class CassandraTaskExecutionDetailsProjection(cassandraTaskExecutionDetailsProjectionDAO: CassandraTaskExecutionDetailsProjectionDAO) extends TaskExecutionDetailsProjection {
+
+  override def load(taskId: TaskId): Option[TaskExecutionDetails] =
+    cassandraTaskExecutionDetailsProjectionDAO.readDetails(taskId).blockOptional().asScala
+
+  override def list: List[TaskExecutionDetails] =
+    cassandraTaskExecutionDetailsProjectionDAO.listDetails().collectList().block().asScala.toList
+
+  override def update(details: TaskExecutionDetails): Unit =
+    cassandraTaskExecutionDetailsProjectionDAO.saveDetails(details).block()
+}
diff --git a/server/task-cassandra/src/test/java/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionTest.java b/server/task-cassandra/src/test/java/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionTest.java
new file mode 100644
index 0000000..0c1743a
--- /dev/null
+++ b/server/task-cassandra/src/test/java/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionTest.java
@@ -0,0 +1,114 @@
+/** **************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ * http://www.apache.org/licenses/LICENSE-2.0                   *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ * ***************************************************************/
+
+package org.apache.james.task.eventsourcing.cassandra;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import org.apache.james.backends.cassandra.CassandraCluster;
+import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.backends.cassandra.init.CassandraZonedDateTimeModule;
+import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
+import org.apache.james.task.TaskExecutionDetails;
+import org.apache.james.task.TaskId;
+import org.apache.james.task.TaskManager;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import scala.collection.JavaConverters;
+import scala.compat.java8.OptionConverters;
+
+class CassandraTaskExecutionDetailsProjectionTest {
+
+    private static final TaskId TASK_ID = TaskId.fromString("2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd");
+    private static final TaskId TASK_ID_2 = TaskId.fromString("2c7f4081-aa30-11e9-bf6c-2d3b9e84aafe");
+
+    private static TaskExecutionDetails TASK_EXECUTION_DETAILS =  new TaskExecutionDetails(TASK_ID, "type", Optional.empty(),
+            TaskManager.Status.COMPLETED, Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty());
+    private static TaskExecutionDetails TASK_EXECUTION_DETAILS_2 =  new TaskExecutionDetails(TASK_ID_2, "type", Optional.empty(),
+            TaskManager.Status.COMPLETED, Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty());
+    private static TaskExecutionDetails TASK_EXECUTION_DETAILS_UPDATED =  new TaskExecutionDetails(TASK_ID, "type", Optional.empty(),
+            TaskManager.Status.FAILED, Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty());
+
+    @RegisterExtension
+    static CassandraClusterExtension cassandraCluster = new CassandraClusterExtension(
+            CassandraModule.aggregateModules(CassandraSchemaVersionModule.MODULE, CassandraZonedDateTimeModule.MODULE, CassandraTaskExecutionDetailsProjectionModule.MODULE()));
+
+    private CassandraTaskExecutionDetailsProjection testee;
+
+    @BeforeEach
+    void setUp(CassandraCluster cassandra) {
+        CassandraTaskExecutionDetailsProjectionDAO cassandraTaskExecutionDetailsProjectionDAO = new CassandraTaskExecutionDetailsProjectionDAO(cassandra.getConf(), cassandra.getTypesProvider());
+        testee = new CassandraTaskExecutionDetailsProjection(cassandraTaskExecutionDetailsProjectionDAO);
+    }
+
+    @Test
+    void loadShouldBeAbleToRetrieveASavedRecord() {
+        testee.update(TASK_EXECUTION_DETAILS);
+
+        Optional<TaskExecutionDetails> taskExecutionDetails = OptionConverters.toJava(testee.load(TASK_ID));
+        assertThat(taskExecutionDetails).contains(TASK_EXECUTION_DETAILS);
+    }
+
+    @Test
+    void updateShouldUpdateRecords() {
+        testee.update(TASK_EXECUTION_DETAILS);
+
+        testee.update(TASK_EXECUTION_DETAILS_UPDATED);
+
+        Optional<TaskExecutionDetails> taskExecutionDetails = OptionConverters.toJava(testee.load(TASK_ID));
+        assertThat(taskExecutionDetails).contains(TASK_EXECUTION_DETAILS_UPDATED);
+    }
+
+    @Test
+    void loadShouldReturnEmptyWhenNone() {
+        Optional<TaskExecutionDetails> taskExecutionDetails = OptionConverters.toJava(testee.load(TASK_ID));
+        assertThat(taskExecutionDetails).isEmpty();
+    }
+
+    @Test
+    void listShouldReturnEmptyWhenNone() {
+        List<TaskExecutionDetails> taskExecutionDetails = JavaConverters.asJava(testee.list());
+        assertThat(taskExecutionDetails).isEmpty();
+    }
+
+    @Test
+    void listShouldReturnAllRecords() {
+        testee.update(TASK_EXECUTION_DETAILS);
+        testee.update(TASK_EXECUTION_DETAILS_2);
+
+        List<TaskExecutionDetails> taskExecutionDetails = JavaConverters.asJava(testee.list());
+        assertThat(taskExecutionDetails).containsOnly(TASK_EXECUTION_DETAILS, TASK_EXECUTION_DETAILS_2);
+    }
+
+    @Test
+    void listDetailsShouldReturnLastUpdatedRecords() {
+        testee.update(TASK_EXECUTION_DETAILS);
+        testee.update(TASK_EXECUTION_DETAILS_UPDATED);
+
+        List<TaskExecutionDetails> taskExecutionDetails = JavaConverters.asJava(testee.list());
+        assertThat(taskExecutionDetails).containsOnly(TASK_EXECUTION_DETAILS_UPDATED);
+    }
+}
diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
index d5c8d42..17c9ebd 100644
--- a/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
+++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
@@ -31,8 +31,7 @@ import org.apache.james.task.eventsourcing.TaskCommand._
 import scala.annotation.tailrec
 
 class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing](val eventStore: EventStore,
-                                                                                 val executionDetailsProjection: TaskExecutionDetailsProjection,
-                                                                                 val recentTasksProjection: RecentTasksProjection) extends TaskManager with Closeable {
+                                                                                 val executionDetailsProjection: TaskExecutionDetailsProjection) extends TaskManager with Closeable {
 
   private val workQueue: WorkQueue = WorkQueue.builder().worker(new SerialTaskManagerWorker)
   private val delayBetweenPollingInMs = 500
@@ -59,8 +58,7 @@ class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing]
       new FailCommandHandler(loadHistory)),
     subscribers = Set(
       executionDetailsProjection.asSubscriber,
-      workDispatcher,
-      recentTasksProjection.asSubscriber),
+      workDispatcher),
     eventStore = eventStore)
 
   override def submit(task: Task): TaskId = {
@@ -79,9 +77,9 @@ class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing]
     .filter(details => details.getStatus == status)
     .asJava
 
-  private def listScala: List[TaskExecutionDetails] = recentTasksProjection
-    .list()
-    .flatMap(executionDetailsProjection.load)
+  private def listScala: List[TaskExecutionDetails] = executionDetailsProjection
+    .list
+    .flatMap(details => executionDetailsProjection.load(details.taskId))
 
   override def cancel(id: TaskId): Unit = {
     val command = RequestCancel(id)
diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/RecentTasksProjection.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/RecentTasksProjection.scala
deleted file mode 100644
index e908e90..0000000
--- a/server/task/src/main/scala/org/apache/james/task/eventsourcing/RecentTasksProjection.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/** **************************************************************
-  * Licensed to the Apache Software Foundation (ASF) under one   *
-  * or more contributor license agreements.  See the NOTICE file *
-  * distributed with this work for additional information        *
-  * regarding copyright ownership.  The ASF licenses this file   *
-  * to you under the Apache License, Version 2.0 (the            *
-  * "License"); you may not use this file except in compliance   *
-  * with the License.  You may obtain a copy of the License at   *
-  * *
-  * http://www.apache.org/licenses/LICENSE-2.0                 *
-  * *
-  * Unless required by applicable law or agreed to in writing,   *
-  * software distributed under the License is distributed on an  *
-  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
-  * KIND, either express or implied.  See the License for the    *
-  * specific language governing permissions and limitations      *
-  * under the License.                                           *
-  * ***************************************************************/
-package org.apache.james.task.eventsourcing
-
-import java.util.concurrent.ConcurrentLinkedDeque
-
-import org.apache.james.eventsourcing.Subscriber
-import org.apache.james.task.TaskId
-
-trait RecentTasksProjection {
-  def list(): List[TaskId]
-
-  def asSubscriber: Subscriber = {
-    case Created(aggregateId, _, _) => add(aggregateId.taskId)
-    case _ =>
-  }
-
-  def add(taskId: TaskId): Unit
-}
-
-class MemoryRecentTasksProjection() extends RecentTasksProjection {
-
-  import scala.collection.JavaConverters._
-
-  private val tasks = new ConcurrentLinkedDeque[TaskId]
-
-  override def list(): List[TaskId] = tasks.asScala.toList
-
-  override def add(taskId: TaskId): Unit = tasks.add(taskId)
-}
diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala
index 18c65dc..918a5e0 100644
--- a/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala
+++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala
@@ -22,11 +22,12 @@ import java.util.concurrent.ConcurrentHashMap
 
 import org.apache.james.eventsourcing.Subscriber
 import org.apache.james.task.{TaskExecutionDetails, TaskId}
+import collection.JavaConverters._
 
 trait TaskExecutionDetailsProjection {
   val asSubscriber: Subscriber = {
     case created: Created =>
-      update(created.getAggregateId.taskId, TaskExecutionDetails.from(created.task, created.aggregateId.taskId))
+      update(TaskExecutionDetails.from(created.task, created.aggregateId.taskId))
     case cancelRequested: CancelRequested =>
       update(cancelRequested.aggregateId.taskId)(_.cancelRequested)
     case started: Started =>
@@ -42,10 +43,11 @@ trait TaskExecutionDetailsProjection {
   private def update(taskId: TaskId)(updater: TaskExecutionDetails => TaskExecutionDetails): Unit =
     load(taskId)
       .map(updater)
-      .foreach(update(taskId, _))
+      .foreach(update)
 
   def load(taskId: TaskId): Option[TaskExecutionDetails]
-  def update(taskId: TaskId, details: TaskExecutionDetails): Unit
+  def list: List[TaskExecutionDetails]
+  def update(details: TaskExecutionDetails): Unit
 }
 
 class MemoryTaskExecutionDetailsProjection() extends TaskExecutionDetailsProjection {
@@ -53,5 +55,7 @@ class MemoryTaskExecutionDetailsProjection() extends TaskExecutionDetailsProject
 
   override def load(taskId: TaskId): Option[TaskExecutionDetails] = Option(this.details.get(taskId))
 
-  override def update(taskId: TaskId, details: TaskExecutionDetails): Unit = this.details.put(taskId, details)
-}
\ No newline at end of file
+  override def list: List[TaskExecutionDetails] = this.details.values().asScala.toList
+
+  override def update(details: TaskExecutionDetails): Unit = this.details.put(details.taskId, details)
+}
diff --git a/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java b/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
index f904b3ed..b017721 100644
--- a/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
+++ b/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
@@ -36,9 +36,8 @@ class EventSourcingTaskManagerTest implements TaskManagerContract {
     @BeforeEach
     void setUp() {
         EventStore eventStore = new InMemoryEventStore();
-        RecentTasksProjection recentTasksProjection = new MemoryRecentTasksProjection();
         TaskExecutionDetailsProjection executionDetailsProjection = new MemoryTaskExecutionDetailsProjection();
-        taskManager = new EventSourcingTaskManager(eventStore, executionDetailsProjection, recentTasksProjection);
+        taskManager = new EventSourcingTaskManager(eventStore, executionDetailsProjection);
     }
 
     @AfterEach


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org