You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/10/18 06:40:35 UTC
[james-project] 02/17: JAMES-2813 refactor TaskAggregate to prevent
creation of the aggregate if History doesn't start with Created event
This is an automated email from the ASF dual-hosted git repository.
rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit ad2c55cd985fd4f55fb21f67d78941a0ed169fbb
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Tue Oct 8 10:25:30 2019 +0200
JAMES-2813 refactor TaskAggregate to prevent creation of the aggregate if History doesn't start with Created event
---
.../james/task/eventsourcing/CommandHandlers.scala | 2 +-
.../james/task/eventsourcing/TaskAggregate.scala | 68 ++++++++++++----------
.../task/eventsourcing/TaskAggregateTest.java | 21 +++++--
3 files changed, 55 insertions(+), 36 deletions(-)
diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala
index ddda76a..1184988 100644
--- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala
+++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala
@@ -37,7 +37,7 @@ class CreateCommandHandler(private val loadHistory: TaskAggregateId => History,
override def handledClass: Class[Create] = classOf[Create]
override def handle(command: Create): util.List[_ <: Event] = {
- loadAggregate(loadHistory, command.id).create(command.task, hostname)
+ TaskAggregate.create(TaskAggregateId(command.id), command.task, hostname)
}
}
diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala
index 0c158ba..1a2094b 100644
--- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala
+++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala
@@ -31,63 +31,67 @@ import scala.collection.JavaConverters._
class TaskAggregate private(val aggregateId: TaskAggregateId, private val history: History) {
- private val currentStatus: Option[Status] = history
+ history.getEvents.asScala.headOption match {
+ case Some(Created(_, _, _, _)) =>
+ case _ => throw new IllegalArgumentException("History must start with Created event")
+ }
+
+ private val currentStatus: Status = history
.getEvents
.asScala
.foldLeft(DecisionProjection.empty)((decision, event) => decision.update(event))
.status
-
-
- def create(task: Task, hostname: Hostname): util.List[Event] = {
- if (currentStatus.isEmpty) {
- createEventWithId(Created(aggregateId, _, task, hostname))
- } else Nil.asJava
- }
+ .get
private[eventsourcing] def start(hostname: Hostname): util.List[Event] = {
- currentStatus match {
- case Some(Status.WAITING) => createEventWithId(Started(aggregateId, _, hostname))
- case _ => Nil.asJava
+ if (!currentStatus.isFinished) {
+ createEventWithId(Started(aggregateId, _, hostname))
+ } else {
+ Nil.asJava
}
}
def requestCancel(hostname: Hostname): util.List[Event] = {
- currentStatus match {
- case Some(status) if !status.isFinished => createEventWithId(CancelRequested(aggregateId, _, hostname))
- case _ => Nil.asJava
+ if (!currentStatus.isFinished) {
+ createEventWithId(CancelRequested(aggregateId, _, hostname))
+ } else {
+ Nil.asJava
}
}
private[eventsourcing] def update(additionalInformation: AdditionalInformation): util.List[Event] = {
currentStatus match {
- case Some(Status.IN_PROGRESS) => createEventWithId(AdditionalInformationUpdated(aggregateId, _, additionalInformation))
- case Some(Status.CANCEL_REQUESTED) => createEventWithId(AdditionalInformationUpdated(aggregateId, _, additionalInformation))
- case Some(Status.COMPLETED) => Nil.asJava
- case Some(Status.FAILED) => Nil.asJava
- case Some(Status.WAITING) => Nil.asJava
- case Some(Status.CANCELLED) => Nil.asJava
+ case Status.IN_PROGRESS => createEventWithId(AdditionalInformationUpdated(aggregateId, _, additionalInformation))
+ case Status.CANCEL_REQUESTED => createEventWithId(AdditionalInformationUpdated(aggregateId, _, additionalInformation))
+ case Status.COMPLETED => Nil.asJava
+ case Status.FAILED => Nil.asJava
+ case Status.WAITING => Nil.asJava
+ case Status.CANCELLED => Nil.asJava
case _ => Nil.asJava
}
}
private[eventsourcing] def complete(result: Result, additionalInformation: Option[AdditionalInformation]): util.List[Event] = {
- currentStatus match {
- case Some(status) if !status.isFinished => createEventWithId(Completed(aggregateId, _, result, additionalInformation))
- case _ => Nil.asJava
+ if (!currentStatus.isFinished) {
+ createEventWithId(Completed(aggregateId, _, result, additionalInformation))
+ } else {
+ Nil.asJava
}
}
private[eventsourcing] def fail(additionalInformation: Option[AdditionalInformation], errorMessage: Option[String], exception: Option[String]): util.List[Event] = {
- currentStatus match {
- case Some(status) if !status.isFinished => createEventWithId(Failed(aggregateId, _, additionalInformation, errorMessage, exception))
- case _ => Nil.asJava
+ if (!currentStatus.isFinished) {
+ createEventWithId(Failed(aggregateId, _, additionalInformation, errorMessage, exception))
+ } else {
+ Nil.asJava
}
}
private[eventsourcing] def cancel(additionalInformation: Option[AdditionalInformation]): util.List[Event] = {
- currentStatus match {
- case Some(status) if !status.isFinished => createEventWithId(Cancelled(aggregateId, _, additionalInformation))
- case _ => Nil.asJava
+ if (!currentStatus.isFinished) {
+ createEventWithId(Cancelled(aggregateId, _, additionalInformation))
+ } else {
+ Nil.asJava
}
}
@@ -98,5 +102,9 @@ class TaskAggregate private(val aggregateId: TaskAggregateId, private val histor
}
object TaskAggregate {
- def fromHistory(aggregateId: TaskAggregateId, history: History) = new TaskAggregate(aggregateId, history)
+ def fromHistory(aggregateId: TaskAggregateId, history: History): TaskAggregate = new TaskAggregate(aggregateId, history)
+
+ def create(aggregateId: TaskAggregateId, task: Task, hostname: Hostname): util.List[Event] = {
+ List[Event](Created(aggregateId, EventId.first(), task, hostname)).asJava
+ }
}
diff --git a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TaskAggregateTest.java b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TaskAggregateTest.java
index 03797da..075deda 100644
--- a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TaskAggregateTest.java
+++ b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TaskAggregateTest.java
@@ -19,6 +19,7 @@
package org.apache.james.task.eventsourcing;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
import java.util.Arrays;
import java.util.function.Function;
@@ -35,8 +36,7 @@ import org.apache.james.task.TaskId;
import org.junit.jupiter.api.Test;
import com.google.common.collect.Streams;
-import scala.None;
-import scala.None$;
+import scala.Option;
class TaskAggregateTest {
@@ -53,6 +53,17 @@ class TaskAggregateTest {
}
@Test
+ void TaskAggregateShouldThrowWhenHistoryDoesntStartWithCreatedEvent() {
+ assertThatThrownBy(() -> TaskAggregate.fromHistory(ID, buildHistory(eventId -> Started.apply(ID, eventId, HOSTNAME))))
+ .isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
+ void TaskAggregateShouldThrowWhenEmptyHistory() {
+ assertThatThrownBy(() -> TaskAggregate.fromHistory(ID, History.empty())).isInstanceOf(IllegalArgumentException.class);
+ }
+
+ @Test
void givenNoStartedTaskEmitNoEventWhenUpdateAdditionalInformationCommand() {
History history = buildHistory(
eventId -> Created.apply(ID, eventId, new MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED), HOSTNAME)
@@ -90,7 +101,7 @@ class TaskAggregateTest {
History history = buildHistory(
eventId -> Created.apply(ID, eventId, task, HOSTNAME),
eventId -> Started.apply(ID, eventId, HOSTNAME),
- eventId -> Completed.apply(ID, eventId, Task.Result.COMPLETED, task.type(), None$.empty())
+ eventId -> Completed.apply(ID, eventId, Task.Result.COMPLETED, Option.empty())
);
TaskAggregate aggregate = TaskAggregate.fromHistory(ID, history);
assertThat(aggregate.update(new MemoryReferenceWithCounterTask.AdditionalInformation(3))).isEmpty();
@@ -102,7 +113,7 @@ class TaskAggregateTest {
History history = buildHistory(
eventId -> Created.apply(ID, eventId, task, HOSTNAME),
eventId -> Started.apply(ID, eventId, HOSTNAME),
- eventId -> Failed.apply(ID, eventId, task.type(), None$.empty())
+ eventId -> Failed.apply(ID, eventId, Option.empty(), Option.empty(), Option.empty())
);
TaskAggregate aggregate = TaskAggregate.fromHistory(ID, history);
assertThat(aggregate.update(new MemoryReferenceWithCounterTask.AdditionalInformation(3))).isEmpty();
@@ -114,7 +125,7 @@ class TaskAggregateTest {
History history = buildHistory(
eventId -> Created.apply(ID, eventId, task, HOSTNAME),
eventId -> Started.apply(ID, eventId, HOSTNAME),
- eventId -> Cancelled.apply(ID, eventId, task.type(), None$.empty())
+ eventId -> Cancelled.apply(ID, eventId, Option.empty())
);
TaskAggregate aggregate = TaskAggregate.fromHistory(ID, history);
assertThat(aggregate.update(new MemoryReferenceWithCounterTask.AdditionalInformation(3))).isEmpty();
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org