You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/10/18 06:40:42 UTC

[james-project] 09/17: JAMES-2813 wire UpdateAdditionalInformation command in the Event Sourcing system

This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 1b1ee833b339a80ac949f6350e6c0a6d8e847e1b
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Wed Oct 9 15:21:47 2019 +0200

    JAMES-2813 wire UpdateAdditionalInformation command in the Event Sourcing system
---
 .../distributed/TasksSerializationModule.java      | 11 ++++++++-
 .../eventsourcing/distributed/TaskEventDTO.scala   | 19 +++++++++++++++
 .../apache/james/task/SerialTaskManagerWorker.java |  2 +-
 .../james/task/eventsourcing/CommandHandlers.scala |  8 +++++++
 .../task/eventsourcing/DecisionProjection.scala    | 27 +++++++++++-----------
 .../eventsourcing/EventSourcingTaskManager.scala   |  3 ++-
 .../task/eventsourcing/WorkerStatusListener.scala  |  6 ++---
 .../james/task/SerialTaskManagerWorkerTest.java    |  5 ++--
 8 files changed, 59 insertions(+), 22 deletions(-)

diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TasksSerializationModule.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TasksSerializationModule.java
index 07cadec..ed2a80a 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TasksSerializationModule.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TasksSerializationModule.java
@@ -26,6 +26,7 @@ import java.util.stream.Stream;
 import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTOModule;
 import org.apache.james.server.task.json.JsonTaskAdditionalInformationsSerializer;
 import org.apache.james.server.task.json.JsonTaskSerializer;
+import org.apache.james.task.eventsourcing.AdditionalInformationUpdated;
 import org.apache.james.task.eventsourcing.CancelRequested;
 import org.apache.james.task.eventsourcing.Cancelled;
 import org.apache.james.task.eventsourcing.Completed;
@@ -90,8 +91,16 @@ public interface TasksSerializationModule {
         .typeName("task-manager-cancelled")
         .withFactory(EventDTOModule::new);
 
+    TaskSerializationModuleFactory<AdditionalInformationUpdated, AdditionalInformationUpdatedDTO> UPDATED = (jsonTaskSerializer, jsonTaskAdditionalInformationsSerializer) -> EventDTOModule
+        .forEvent(AdditionalInformationUpdated.class)
+        .convertToDTO(AdditionalInformationUpdatedDTO.class)
+        .toDomainObjectConverter(dto -> dto.toDomainObject(jsonTaskAdditionalInformationsSerializer))
+        .toDTOConverter((event, typeName) -> AdditionalInformationUpdatedDTO.fromDomainObject(jsonTaskAdditionalInformationsSerializer, event, typeName))
+        .typeName("task-manager-updated")
+        .withFactory(EventDTOModule::new);
+
     BiFunction<JsonTaskSerializer, JsonTaskAdditionalInformationsSerializer, List<EventDTOModule<?, ?>>> MODULES = (jsonTaskSerializer, jsonTaskAdditionalInformationsSerializer) -> Stream
-        .of(CREATED, STARTED, CANCEL_REQUESTED, CANCELLED, COMPLETED, FAILED)
+        .of(CREATED, STARTED, CANCEL_REQUESTED, CANCELLED, COMPLETED, FAILED, UPDATED)
         .map(moduleFactory -> moduleFactory.create(jsonTaskSerializer, jsonTaskAdditionalInformationsSerializer))
         .collect(Guavate.toImmutableList());
 }
diff --git a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala
index 978d08e..4730a4c 100644
--- a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala
+++ b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala
@@ -27,6 +27,7 @@ import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTO
 import org.apache.james.server.task.json.{JsonTaskAdditionalInformationsSerializer, JsonTaskSerializer}
 import org.apache.james.task.eventsourcing._
 import org.apache.james.task.{Hostname, Task, TaskId}
+
 import scala.compat.java8.OptionConverters._
 
 sealed abstract class TaskEventDTO(val getType: String, val getAggregate: String, val getEvent: Int) extends EventDTO {
@@ -140,3 +141,21 @@ object CancelledDTO {
     CancelledDTO(typeName, event.aggregateId.taskId.asString(), event.eventId.serialize(), serializedAdditionalInformations)
   }
 }
+
+case class AdditionalInformationUpdatedDTO(@JsonProperty("type") typeName: String,
+                     @JsonProperty("aggregate") aggregateId: String,
+                     @JsonProperty("event") eventId: Int,
+                     @JsonProperty("additionalInformation") getAdditionalInformation: String)
+  extends TaskEventDTO(typeName, aggregateId, eventId) {
+  def toDomainObject(jsonTaskAdditionalInformationsSerializer: JsonTaskAdditionalInformationsSerializer): AdditionalInformationUpdated = {
+    val deserializedAdditionalInformation = jsonTaskAdditionalInformationsSerializer.deserialize(getAdditionalInformation)
+    AdditionalInformationUpdated(domainAggregateId, domainEventId, deserializedAdditionalInformation)
+  }
+}
+
+object AdditionalInformationUpdatedDTO {
+  def fromDomainObject(jsonTaskAdditionalInformationsSerializer: JsonTaskAdditionalInformationsSerializer)(event: AdditionalInformationUpdated, typeName: String): AdditionalInformationUpdatedDTO = {
+    val serializedAdditionalInformations = jsonTaskAdditionalInformationsSerializer.serialize(event.additionalInformation)
+    AdditionalInformationUpdatedDTO(typeName, event.aggregateId.taskId.asString(), event.eventId.serialize(), serializedAdditionalInformations)
+  }
+}
diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
index 8fb0e15..60b5a16 100644
--- a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
+++ b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
@@ -114,7 +114,7 @@ public class SerialTaskManagerWorker implements TaskManagerWorker {
             .delayElement(Duration.ofSeconds(1))
             .repeat()
             .flatMap(Mono::justOrEmpty)
-            .doOnNext(information -> listener.updated(taskWithId.getId(), taskWithId.getTask().type(), information));
+            .doOnNext(information -> listener.updated(taskWithId.getId(), information));
     }
 
 
diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala
index 1184988..9db161e 100644
--- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala
+++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala
@@ -81,4 +81,12 @@ class FailCommandHandler(private val loadHistory: TaskAggregateId => History) ex
   override def handle(command: Fail): util.List[_ <: Event] = {
     loadAggregate(loadHistory, command.id).fail(command.additionalInformation, command.errorMessage, command.exception)
   }
+}
+
+class UpdateCommandHandler(private val loadHistory: TaskAggregateId => History) extends TaskCommandHandler[UpdateAdditionalInformation] {
+  override def handledClass: Class[UpdateAdditionalInformation] = classOf[UpdateAdditionalInformation]
+
+  override def handle(command: UpdateAdditionalInformation): util.List[_ <: Event] = {
+    loadAggregate(loadHistory, command.id).update(command.additionalInformation)
+  }
 }
\ No newline at end of file
diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/DecisionProjection.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/DecisionProjection.scala
index d5394bb..b5a460f 100644
--- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/DecisionProjection.scala
+++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/DecisionProjection.scala
@@ -22,22 +22,23 @@ import org.apache.james.eventsourcing.Event
 import org.apache.james.task.TaskManager.Status
 
 case class DecisionProjection(status: Option[Status]) {
-  val update: Event => DecisionProjection =
-    DecisionProjection.create
+  def update(event: Event): DecisionProjection = {
+    DecisionProjection(
+      event match {
+        case event: Created => Some(Status.WAITING)
+        case event: Started => Some(Status.IN_PROGRESS)
+        case event: CancelRequested => Some(Status.CANCEL_REQUESTED)
+        case event: Cancelled => Some(Status.CANCELLED)
+        case event: Completed => Some(Status.COMPLETED)
+        case event: Failed => Some(Status.FAILED)
+        case event: AdditionalInformationUpdated => status
+      }
+    )
+  }
+
 }
 
 object DecisionProjection {
-  def create(event: Event): DecisionProjection = DecisionProjection(Some(fromEvent(event)))
-
   def empty: DecisionProjection = DecisionProjection(None)
-
-  private def fromEvent(event: Event): Status = event match {
-    case event: Created => Status.WAITING
-    case event: Started => Status.IN_PROGRESS
-    case event: CancelRequested => Status.CANCEL_REQUESTED
-    case event: Cancelled => Status.CANCELLED
-    case event: Completed => Status.COMPLETED
-    case event: Failed => Status.FAILED
-  }
 }
 
diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
index 20cf21d..35856b0 100644
--- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
+++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
@@ -59,7 +59,8 @@ class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing]
       new RequestCancelCommandHandler(loadHistory, hostname),
       new CompleteCommandHandler(loadHistory),
       new CancelCommandHandler(loadHistory),
-      new FailCommandHandler(loadHistory)),
+      new FailCommandHandler(loadHistory),
+      new UpdateCommandHandler(loadHistory)),
     subscribers = Set(
       executionDetailsProjection.asSubscriber(hostname),
       workDispatcher,
diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala
index b6a3f80..aa93d82 100644
--- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala
+++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala
@@ -21,13 +21,12 @@ package org.apache.james.task.eventsourcing
 
 import java.util.Optional
 
+import com.google.common.base.Throwables
 import org.apache.james.eventsourcing.EventSourcingSystem
 import org.apache.james.task.Task.Result
 import org.apache.james.task.eventsourcing.TaskCommand._
 import org.apache.james.task.{TaskExecutionDetails, TaskId, TaskManagerWorker}
 
-import com.google.common.base.Throwables
-
 import scala.compat.java8.OptionConverters._
 
 case class WorkerStatusListener(eventSourcingSystem: EventSourcingSystem) extends TaskManagerWorker.Listener {
@@ -49,5 +48,6 @@ case class WorkerStatusListener(eventSourcingSystem: EventSourcingSystem) extend
   override def cancelled(taskId: TaskId, additionalInformation: Optional[TaskExecutionDetails.AdditionalInformation]): Unit =
     eventSourcingSystem.dispatch(Cancel(taskId, additionalInformation.asScala ))
 
-  override def updated(taskId: TaskId, additionalInformation: TaskExecutionDetails.AdditionalInformation): Unit = ???
+  override def updated(taskId: TaskId, additionalInformation: TaskExecutionDetails.AdditionalInformation): Unit =
+    eventSourcingSystem.dispatch(UpdateAdditionalInformation(taskId, additionalInformation))
 }
\ No newline at end of file
diff --git a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
index 7448a20..b693347 100644
--- a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
+++ b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
@@ -100,10 +100,9 @@ class SerialTaskManagerWorkerTest {
 
         worker.executeTask(taskWithId).block();
 
-        verify(listener, atMost(2)).updated(eq(taskWithId.getId()), notNull());
+        verify(listener, atMost(3)).updated(eq(taskWithId.getId()), notNull());
     }
 
-
     @Test
     void aRunningTaskShouldEmitAtMostOneInformationPerSecond() {
         TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), new MemoryReferenceWithCounterTask((counter) ->
@@ -115,7 +114,7 @@ class SerialTaskManagerWorkerTest {
 
         worker.executeTask(taskWithId).block();
 
-        verify(listener, times(2)).updated(eq(taskWithId.getId()), notNull());
+        verify(listener, atMost(3)).updated(eq(taskWithId.getId()), notNull());
     }
 
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org