You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/10/18 06:40:42 UTC
[james-project] 09/17: JAMES-2813 wire UpdateAdditionalInformation
command in the Event Sourcing system
This is an automated email from the ASF dual-hosted git repository.
rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 1b1ee833b339a80ac949f6350e6c0a6d8e847e1b
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Wed Oct 9 15:21:47 2019 +0200
JAMES-2813 wire UpdateAdditionalInformation command in the Event Sourcing system
---
.../distributed/TasksSerializationModule.java | 11 ++++++++-
.../eventsourcing/distributed/TaskEventDTO.scala | 19 +++++++++++++++
.../apache/james/task/SerialTaskManagerWorker.java | 2 +-
.../james/task/eventsourcing/CommandHandlers.scala | 8 +++++++
.../task/eventsourcing/DecisionProjection.scala | 27 +++++++++++-----------
.../eventsourcing/EventSourcingTaskManager.scala | 3 ++-
.../task/eventsourcing/WorkerStatusListener.scala | 6 ++---
.../james/task/SerialTaskManagerWorkerTest.java | 5 ++--
8 files changed, 59 insertions(+), 22 deletions(-)
diff --git a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TasksSerializationModule.java b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TasksSerializationModule.java
index 07cadec..ed2a80a 100644
--- a/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TasksSerializationModule.java
+++ b/server/task/task-distributed/src/main/java/org/apache/james/task/eventsourcing/distributed/TasksSerializationModule.java
@@ -26,6 +26,7 @@ import java.util.stream.Stream;
import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTOModule;
import org.apache.james.server.task.json.JsonTaskAdditionalInformationsSerializer;
import org.apache.james.server.task.json.JsonTaskSerializer;
+import org.apache.james.task.eventsourcing.AdditionalInformationUpdated;
import org.apache.james.task.eventsourcing.CancelRequested;
import org.apache.james.task.eventsourcing.Cancelled;
import org.apache.james.task.eventsourcing.Completed;
@@ -90,8 +91,16 @@ public interface TasksSerializationModule {
.typeName("task-manager-cancelled")
.withFactory(EventDTOModule::new);
+ TaskSerializationModuleFactory<AdditionalInformationUpdated, AdditionalInformationUpdatedDTO> UPDATED = (jsonTaskSerializer, jsonTaskAdditionalInformationsSerializer) -> EventDTOModule
+ .forEvent(AdditionalInformationUpdated.class)
+ .convertToDTO(AdditionalInformationUpdatedDTO.class)
+ .toDomainObjectConverter(dto -> dto.toDomainObject(jsonTaskAdditionalInformationsSerializer))
+ .toDTOConverter((event, typeName) -> AdditionalInformationUpdatedDTO.fromDomainObject(jsonTaskAdditionalInformationsSerializer, event, typeName))
+ .typeName("task-manager-updated")
+ .withFactory(EventDTOModule::new);
+
BiFunction<JsonTaskSerializer, JsonTaskAdditionalInformationsSerializer, List<EventDTOModule<?, ?>>> MODULES = (jsonTaskSerializer, jsonTaskAdditionalInformationsSerializer) -> Stream
- .of(CREATED, STARTED, CANCEL_REQUESTED, CANCELLED, COMPLETED, FAILED)
+ .of(CREATED, STARTED, CANCEL_REQUESTED, CANCELLED, COMPLETED, FAILED, UPDATED)
.map(moduleFactory -> moduleFactory.create(jsonTaskSerializer, jsonTaskAdditionalInformationsSerializer))
.collect(Guavate.toImmutableList());
}
diff --git a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala
index 978d08e..4730a4c 100644
--- a/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala
+++ b/server/task/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala
@@ -27,6 +27,7 @@ import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTO
import org.apache.james.server.task.json.{JsonTaskAdditionalInformationsSerializer, JsonTaskSerializer}
import org.apache.james.task.eventsourcing._
import org.apache.james.task.{Hostname, Task, TaskId}
+
import scala.compat.java8.OptionConverters._
sealed abstract class TaskEventDTO(val getType: String, val getAggregate: String, val getEvent: Int) extends EventDTO {
@@ -140,3 +141,21 @@ object CancelledDTO {
CancelledDTO(typeName, event.aggregateId.taskId.asString(), event.eventId.serialize(), serializedAdditionalInformations)
}
}
+
+case class AdditionalInformationUpdatedDTO(@JsonProperty("type") typeName: String,
+ @JsonProperty("aggregate") aggregateId: String,
+ @JsonProperty("event") eventId: Int,
+ @JsonProperty("additionalInformation") getAdditionalInformation: String)
+ extends TaskEventDTO(typeName, aggregateId, eventId) {
+ def toDomainObject(jsonTaskAdditionalInformationsSerializer: JsonTaskAdditionalInformationsSerializer): AdditionalInformationUpdated = {
+ val deserializedAdditionalInformation = jsonTaskAdditionalInformationsSerializer.deserialize(getAdditionalInformation)
+ AdditionalInformationUpdated(domainAggregateId, domainEventId, deserializedAdditionalInformation)
+ }
+}
+
+object AdditionalInformationUpdatedDTO {
+ def fromDomainObject(jsonTaskAdditionalInformationsSerializer: JsonTaskAdditionalInformationsSerializer)(event: AdditionalInformationUpdated, typeName: String): AdditionalInformationUpdatedDTO = {
+ val serializedAdditionalInformations = jsonTaskAdditionalInformationsSerializer.serialize(event.additionalInformation)
+ AdditionalInformationUpdatedDTO(typeName, event.aggregateId.taskId.asString(), event.eventId.serialize(), serializedAdditionalInformations)
+ }
+}
diff --git a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
index 8fb0e15..60b5a16 100644
--- a/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
+++ b/server/task/task-memory/src/main/java/org/apache/james/task/SerialTaskManagerWorker.java
@@ -114,7 +114,7 @@ public class SerialTaskManagerWorker implements TaskManagerWorker {
.delayElement(Duration.ofSeconds(1))
.repeat()
.flatMap(Mono::justOrEmpty)
- .doOnNext(information -> listener.updated(taskWithId.getId(), taskWithId.getTask().type(), information));
+ .doOnNext(information -> listener.updated(taskWithId.getId(), information));
}
diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala
index 1184988..9db161e 100644
--- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala
+++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala
@@ -81,4 +81,12 @@ class FailCommandHandler(private val loadHistory: TaskAggregateId => History) ex
override def handle(command: Fail): util.List[_ <: Event] = {
loadAggregate(loadHistory, command.id).fail(command.additionalInformation, command.errorMessage, command.exception)
}
+}
+
+class UpdateCommandHandler(private val loadHistory: TaskAggregateId => History) extends TaskCommandHandler[UpdateAdditionalInformation] {
+ override def handledClass: Class[UpdateAdditionalInformation] = classOf[UpdateAdditionalInformation]
+
+ override def handle(command: UpdateAdditionalInformation): util.List[_ <: Event] = {
+ loadAggregate(loadHistory, command.id).update(command.additionalInformation)
+ }
}
\ No newline at end of file
diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/DecisionProjection.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/DecisionProjection.scala
index d5394bb..b5a460f 100644
--- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/DecisionProjection.scala
+++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/DecisionProjection.scala
@@ -22,22 +22,23 @@ import org.apache.james.eventsourcing.Event
import org.apache.james.task.TaskManager.Status
case class DecisionProjection(status: Option[Status]) {
- val update: Event => DecisionProjection =
- DecisionProjection.create
+ def update(event: Event): DecisionProjection = {
+ DecisionProjection(
+ event match {
+ case event: Created => Some(Status.WAITING)
+ case event: Started => Some(Status.IN_PROGRESS)
+ case event: CancelRequested => Some(Status.CANCEL_REQUESTED)
+ case event: Cancelled => Some(Status.CANCELLED)
+ case event: Completed => Some(Status.COMPLETED)
+ case event: Failed => Some(Status.FAILED)
+ case event: AdditionalInformationUpdated => status
+ }
+ )
+ }
+
}
object DecisionProjection {
- def create(event: Event): DecisionProjection = DecisionProjection(Some(fromEvent(event)))
-
def empty: DecisionProjection = DecisionProjection(None)
-
- private def fromEvent(event: Event): Status = event match {
- case event: Created => Status.WAITING
- case event: Started => Status.IN_PROGRESS
- case event: CancelRequested => Status.CANCEL_REQUESTED
- case event: Cancelled => Status.CANCELLED
- case event: Completed => Status.COMPLETED
- case event: Failed => Status.FAILED
- }
}
diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
index 20cf21d..35856b0 100644
--- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
+++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
@@ -59,7 +59,8 @@ class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing]
new RequestCancelCommandHandler(loadHistory, hostname),
new CompleteCommandHandler(loadHistory),
new CancelCommandHandler(loadHistory),
- new FailCommandHandler(loadHistory)),
+ new FailCommandHandler(loadHistory),
+ new UpdateCommandHandler(loadHistory)),
subscribers = Set(
executionDetailsProjection.asSubscriber(hostname),
workDispatcher,
diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala
index b6a3f80..aa93d82 100644
--- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala
+++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala
@@ -21,13 +21,12 @@ package org.apache.james.task.eventsourcing
import java.util.Optional
+import com.google.common.base.Throwables
import org.apache.james.eventsourcing.EventSourcingSystem
import org.apache.james.task.Task.Result
import org.apache.james.task.eventsourcing.TaskCommand._
import org.apache.james.task.{TaskExecutionDetails, TaskId, TaskManagerWorker}
-import com.google.common.base.Throwables
-
import scala.compat.java8.OptionConverters._
case class WorkerStatusListener(eventSourcingSystem: EventSourcingSystem) extends TaskManagerWorker.Listener {
@@ -49,5 +48,6 @@ case class WorkerStatusListener(eventSourcingSystem: EventSourcingSystem) extend
override def cancelled(taskId: TaskId, additionalInformation: Optional[TaskExecutionDetails.AdditionalInformation]): Unit =
eventSourcingSystem.dispatch(Cancel(taskId, additionalInformation.asScala ))
- override def updated(taskId: TaskId, additionalInformation: TaskExecutionDetails.AdditionalInformation): Unit = ???
+ override def updated(taskId: TaskId, additionalInformation: TaskExecutionDetails.AdditionalInformation): Unit =
+ eventSourcingSystem.dispatch(UpdateAdditionalInformation(taskId, additionalInformation))
}
\ No newline at end of file
diff --git a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
index 7448a20..b693347 100644
--- a/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
+++ b/server/task/task-memory/src/test/java/org/apache/james/task/SerialTaskManagerWorkerTest.java
@@ -100,10 +100,9 @@ class SerialTaskManagerWorkerTest {
worker.executeTask(taskWithId).block();
- verify(listener, atMost(2)).updated(eq(taskWithId.getId()), notNull());
+ verify(listener, atMost(3)).updated(eq(taskWithId.getId()), notNull());
}
-
@Test
void aRunningTaskShouldEmitAtMostOneInformationPerSecond() {
TaskWithId taskWithId = new TaskWithId(TaskId.generateTaskId(), new MemoryReferenceWithCounterTask((counter) ->
@@ -115,7 +114,7 @@ class SerialTaskManagerWorkerTest {
worker.executeTask(taskWithId).block();
- verify(listener, times(2)).updated(eq(taskWithId.getId()), notNull());
+ verify(listener, atMost(3)).updated(eq(taskWithId.getId()), notNull());
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org