You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/10/18 06:40:35 UTC

[james-project] 02/17: JAMES-2813 refactor TaskAggregate to prevent creation of the aggregate if History doesn't start with Created event

This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit ad2c55cd985fd4f55fb21f67d78941a0ed169fbb
Author: Matthieu Baechler <ma...@apache.org>
AuthorDate: Tue Oct 8 10:25:30 2019 +0200

    JAMES-2813 refactor TaskAggregate to prevent creation of the aggregate if History doesn't start with Created event
---
 .../james/task/eventsourcing/CommandHandlers.scala |  2 +-
 .../james/task/eventsourcing/TaskAggregate.scala   | 68 ++++++++++++----------
 .../task/eventsourcing/TaskAggregateTest.java      | 21 +++++--
 3 files changed, 55 insertions(+), 36 deletions(-)

diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala
index ddda76a..1184988 100644
--- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala
+++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala
@@ -37,7 +37,7 @@ class CreateCommandHandler(private val loadHistory: TaskAggregateId => History,
   override def handledClass: Class[Create] = classOf[Create]
 
   override def handle(command: Create): util.List[_ <: Event] = {
-    loadAggregate(loadHistory, command.id).create(command.task, hostname)
+    TaskAggregate.create(TaskAggregateId(command.id), command.task, hostname)
   }
 }
 
diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala
index 0c158ba..1a2094b 100644
--- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala
+++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala
@@ -31,63 +31,67 @@ import scala.collection.JavaConverters._
 
 class TaskAggregate private(val aggregateId: TaskAggregateId, private val history: History) {
 
-  private val currentStatus: Option[Status] = history
+  history.getEvents.asScala.headOption match {
+    case Some(Created(_, _, _, _)) =>
+    case _ => throw new IllegalArgumentException("History must start with Created event")
+  }
+
+  private val currentStatus: Status = history
     .getEvents
     .asScala
     .foldLeft(DecisionProjection.empty)((decision, event) => decision.update(event))
     .status
-
-
-  def create(task: Task, hostname: Hostname): util.List[Event] = {
-    if (currentStatus.isEmpty) {
-      createEventWithId(Created(aggregateId, _, task, hostname))
-    } else Nil.asJava
-  }
+    .get
 
   private[eventsourcing] def start(hostname: Hostname): util.List[Event] = {
-    currentStatus match {
-      case Some(Status.WAITING) => createEventWithId(Started(aggregateId, _, hostname))
-      case _ => Nil.asJava
+    if (!currentStatus.isFinished) {
+      createEventWithId(Started(aggregateId, _, hostname))
+    } else {
+      Nil.asJava
     }
   }
 
   def requestCancel(hostname: Hostname): util.List[Event] = {
-    currentStatus match {
-      case Some(status) if !status.isFinished => createEventWithId(CancelRequested(aggregateId, _, hostname))
-      case _ => Nil.asJava
+    if (!currentStatus.isFinished) {
+      createEventWithId(CancelRequested(aggregateId, _, hostname))
+    } else {
+      Nil.asJava
     }
   }
 
   private[eventsourcing] def update(additionalInformation: AdditionalInformation): util.List[Event] = {
     currentStatus match {
-      case Some(Status.IN_PROGRESS) => createEventWithId(AdditionalInformationUpdated(aggregateId, _, additionalInformation))
-      case Some(Status.CANCEL_REQUESTED) => createEventWithId(AdditionalInformationUpdated(aggregateId, _, additionalInformation))
-      case Some(Status.COMPLETED) => Nil.asJava
-      case Some(Status.FAILED) => Nil.asJava
-      case Some(Status.WAITING) => Nil.asJava
-      case Some(Status.CANCELLED) => Nil.asJava
+      case Status.IN_PROGRESS => createEventWithId(AdditionalInformationUpdated(aggregateId, _, additionalInformation))
+      case Status.CANCEL_REQUESTED => createEventWithId(AdditionalInformationUpdated(aggregateId, _, additionalInformation))
+      case Status.COMPLETED => Nil.asJava
+      case Status.FAILED => Nil.asJava
+      case Status.WAITING => Nil.asJava
+      case Status.CANCELLED => Nil.asJava
       case _ => Nil.asJava
     }
   }
 
   private[eventsourcing] def complete(result: Result, additionalInformation: Option[AdditionalInformation]): util.List[Event] = {
-    currentStatus match {
-      case Some(status) if !status.isFinished => createEventWithId(Completed(aggregateId, _, result, additionalInformation))
-      case _ => Nil.asJava
+    if (!currentStatus.isFinished) {
+      createEventWithId(Completed(aggregateId, _, result, additionalInformation))
+    } else {
+      Nil.asJava
     }
   }
 
   private[eventsourcing] def fail(additionalInformation: Option[AdditionalInformation], errorMessage: Option[String], exception: Option[String]): util.List[Event] = {
-    currentStatus match {
-      case Some(status) if !status.isFinished => createEventWithId(Failed(aggregateId, _, additionalInformation, errorMessage, exception))
-      case _ => Nil.asJava
+    if (!currentStatus.isFinished) {
+      createEventWithId(Failed(aggregateId, _, additionalInformation, errorMessage, exception))
+    } else {
+      Nil.asJava
     }
   }
 
   private[eventsourcing] def cancel(additionalInformation: Option[AdditionalInformation]): util.List[Event] = {
-    currentStatus match {
-      case Some(status) if !status.isFinished => createEventWithId(Cancelled(aggregateId, _, additionalInformation))
-      case _ => Nil.asJava
+    if (!currentStatus.isFinished) {
+      createEventWithId(Cancelled(aggregateId, _, additionalInformation))
+    } else {
+      Nil.asJava
     }
   }
 
@@ -98,5 +102,9 @@ class TaskAggregate private(val aggregateId: TaskAggregateId, private val histor
 }
 
 object TaskAggregate {
-  def fromHistory(aggregateId: TaskAggregateId, history: History) = new TaskAggregate(aggregateId, history)
+  def fromHistory(aggregateId: TaskAggregateId, history: History): TaskAggregate = new TaskAggregate(aggregateId, history)
+
+  def create(aggregateId: TaskAggregateId, task: Task, hostname: Hostname): util.List[Event] = {
+    List[Event](Created(aggregateId, EventId.first(), task, hostname)).asJava
+  }
 }
diff --git a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TaskAggregateTest.java b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TaskAggregateTest.java
index 03797da..075deda 100644
--- a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TaskAggregateTest.java
+++ b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TaskAggregateTest.java
@@ -19,6 +19,7 @@
 package org.apache.james.task.eventsourcing;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 import java.util.Arrays;
 import java.util.function.Function;
@@ -35,8 +36,7 @@ import org.apache.james.task.TaskId;
 import org.junit.jupiter.api.Test;
 
 import com.google.common.collect.Streams;
-import scala.None;
-import scala.None$;
+import scala.Option;
 
 class TaskAggregateTest {
 
@@ -53,6 +53,17 @@ class TaskAggregateTest {
     }
 
     @Test
+    void TaskAggregateShouldThrowWhenHistoryDoesntStartWithCreatedEvent() {
+        assertThatThrownBy(() -> TaskAggregate.fromHistory(ID, buildHistory(eventId -> Started.apply(ID, eventId, HOSTNAME))))
+            .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    void TaskAggregateShouldThrowWhenEmptyHistory() {
+        assertThatThrownBy(() -> TaskAggregate.fromHistory(ID, History.empty())).isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
     void givenNoStartedTaskEmitNoEventWhenUpdateAdditionalInformationCommand() {
         History history = buildHistory(
             eventId -> Created.apply(ID, eventId, new MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED), HOSTNAME)
@@ -90,7 +101,7 @@ class TaskAggregateTest {
         History history = buildHistory(
             eventId -> Created.apply(ID, eventId, task, HOSTNAME),
             eventId -> Started.apply(ID, eventId, HOSTNAME),
-            eventId -> Completed.apply(ID, eventId, Task.Result.COMPLETED, task.type(), None$.empty())
+            eventId -> Completed.apply(ID, eventId, Task.Result.COMPLETED, Option.empty())
         );
         TaskAggregate aggregate = TaskAggregate.fromHistory(ID, history);
         assertThat(aggregate.update(new MemoryReferenceWithCounterTask.AdditionalInformation(3))).isEmpty();
@@ -102,7 +113,7 @@ class TaskAggregateTest {
         History history = buildHistory(
             eventId -> Created.apply(ID, eventId, task, HOSTNAME),
             eventId -> Started.apply(ID, eventId, HOSTNAME),
-            eventId -> Failed.apply(ID, eventId, task.type(), None$.empty())
+            eventId -> Failed.apply(ID, eventId, Option.empty(), Option.empty(), Option.empty())
         );
         TaskAggregate aggregate = TaskAggregate.fromHistory(ID, history);
         assertThat(aggregate.update(new MemoryReferenceWithCounterTask.AdditionalInformation(3))).isEmpty();
@@ -114,7 +125,7 @@ class TaskAggregateTest {
         History history = buildHistory(
             eventId -> Created.apply(ID, eventId, task, HOSTNAME),
             eventId -> Started.apply(ID, eventId, HOSTNAME),
-            eventId -> Cancelled.apply(ID, eventId, task.type(), None$.empty())
+            eventId -> Cancelled.apply(ID, eventId, Option.empty())
         );
         TaskAggregate aggregate = TaskAggregate.fromHistory(ID, history);
         assertThat(aggregate.update(new MemoryReferenceWithCounterTask.AdditionalInformation(3))).isEmpty();


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org