You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/10/18 06:40:49 UTC

[james-project] 16/17: JAMES-2813 use timestamp in additional information to refuse stalled update event

This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 1da890c47c4cc0eae8686a304bf2a923bec60819
Author: RĂ©mi KOWALSKI <rk...@linagora.com>
AuthorDate: Tue Oct 15 17:54:38 2019 +0200

    JAMES-2813 use timestamp in additional information to refuse stalled update event
---
 .../task/eventsourcing/DecisionProjection.scala    | 24 +++++++------
 .../james/task/eventsourcing/TaskAggregate.scala   | 21 ++++++-----
 .../task/eventsourcing/TaskAggregateTest.java      | 41 ++++++++++++++++++----
 3 files changed, 58 insertions(+), 28 deletions(-)

diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/DecisionProjection.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/DecisionProjection.scala
index b5a460f..ce2f6d2 100644
--- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/DecisionProjection.scala
+++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/DecisionProjection.scala
@@ -18,27 +18,29 @@
   * ***************************************************************/
 package org.apache.james.task.eventsourcing
 
+import java.time.Instant
+
 import org.apache.james.eventsourcing.Event
 import org.apache.james.task.TaskManager.Status
 
-case class DecisionProjection(status: Option[Status]) {
+case class DecisionProjection(status: Status, latestUpdateAdditionalInformationUpdate : Option[Instant]) {
   def update(event: Event): DecisionProjection = {
-    DecisionProjection(
       event match {
-        case event: Created => Some(Status.WAITING)
-        case event: Started => Some(Status.IN_PROGRESS)
-        case event: CancelRequested => Some(Status.CANCEL_REQUESTED)
-        case event: Cancelled => Some(Status.CANCELLED)
-        case event: Completed => Some(Status.COMPLETED)
-        case event: Failed => Some(Status.FAILED)
-        case event: AdditionalInformationUpdated => status
+        case _: Created => this
+        case _: Started => DecisionProjection(Status.IN_PROGRESS, None)
+        case _: CancelRequested => DecisionProjection(Status.CANCEL_REQUESTED,latestUpdateAdditionalInformationUpdate)
+        case event: Cancelled => DecisionProjection(Status.CANCELLED, event.additionalInformation.map(_.timestamp))
+        case event: Completed => DecisionProjection(Status.COMPLETED, event.additionalInformation.map(_.timestamp))
+        case event: Failed => DecisionProjection(Status.FAILED, event.additionalInformation.map(_.timestamp))
+        case event: AdditionalInformationUpdated => DecisionProjection(status, Some(event.additionalInformation.timestamp))
       }
-    )
   }
 
+  def additionalInformationIsOlderThan(timestamp: Instant) : Boolean = latestUpdateAdditionalInformationUpdate.forall(timestamp.isAfter)
+
 }
 
 object DecisionProjection {
-  def empty: DecisionProjection = DecisionProjection(None)
+  def initial(created : Created): DecisionProjection = DecisionProjection(Status.WAITING, None)
 }
 
diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala
index 1a4bbbb..d73cf65 100644
--- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala
+++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala
@@ -31,22 +31,21 @@ import scala.collection.JavaConverters._
 
 class TaskAggregate private(val aggregateId: TaskAggregateId, private val history: History) {
 
-  history.getEvents.asScala.headOption match {
-    case Some(Created(_, _, _, _)) =>
+  val initialEvent = history.getEvents.asScala.headOption match {
+    case Some(created @ Created(_, _, _, _)) => created
     case _ => throw new IllegalArgumentException("History must start with Created event")
   }
 
-  private val currentStatus: Status = history
+  private val currentDecisionProjection: DecisionProjection = history
     .getEvents
     .asScala
-    .foldLeft(DecisionProjection.empty)((decision, event) => decision.update(event))
-    .status
-    .get
+    .tail
+    .foldLeft(DecisionProjection.initial(initialEvent))((decision, event) => decision.update(event))
 
   private def optionToJavaList[T](element: Option[T]): util.List[T] = element.toList.asJava
 
   private def createEventIfNotFinished(event: EventId => Event): Option[Event] = {
-    if (!currentStatus.isFinished) {
+    if (!currentDecisionProjection.status.isFinished) {
       Some(event(history.getNextEventId))
     } else
       None
@@ -63,11 +62,11 @@ class TaskAggregate private(val aggregateId: TaskAggregateId, private val histor
     createEventIfNotFinishedAsJavaList(CancelRequested(aggregateId, _, hostname))
 
   private[eventsourcing] def update(additionalInformation: AdditionalInformation): util.List[Event] =
-    (currentStatus match {
-      case Status.IN_PROGRESS => createEvent(AdditionalInformationUpdated(aggregateId, _, additionalInformation))
-      case Status.CANCEL_REQUESTED => createEvent(AdditionalInformationUpdated(aggregateId, _, additionalInformation))
+    optionToJavaList(currentDecisionProjection.status match {
+      case Status.IN_PROGRESS if currentDecisionProjection.additionalInformationIsOlderThan(additionalInformation.timestamp) => createEvent(AdditionalInformationUpdated(aggregateId, _, additionalInformation))
+      case Status.CANCEL_REQUESTED if currentDecisionProjection.additionalInformationIsOlderThan(additionalInformation.timestamp) => createEvent(AdditionalInformationUpdated(aggregateId, _, additionalInformation))
       case _ => None
-    }).toList.asJava
+    })
 
   private[eventsourcing] def complete(result: Result, additionalInformation: Option[AdditionalInformation]): util.List[Event] =
     createEventIfNotFinishedAsJavaList(Completed(aggregateId, _, result, additionalInformation))
diff --git a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TaskAggregateTest.java b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TaskAggregateTest.java
index bb98b26..e12f8bc 100644
--- a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TaskAggregateTest.java
+++ b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TaskAggregateTest.java
@@ -66,7 +66,7 @@ class TaskAggregateTest {
     }
 
     @Test
-    void givenNoStartedTaskEmitNoEventWhenUpdateAdditionalInformationCommand() {
+    void givenNoStartedTaskShouldEmitNoEventWhenUpdateAdditionalInformationCommand() {
         History history = buildHistory(
             eventId -> Created.apply(ID, eventId, new MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED), HOSTNAME)
         );
@@ -75,7 +75,7 @@ class TaskAggregateTest {
     }
 
     @Test
-    void givenInProgressTaskEmitEventWhenUpdateAdditionalInformationCommand() {
+    void givenInProgressTaskShouldEmitEventWhenUpdateAdditionalInformationCommand() {
         History history = buildHistory(
             eventId -> Created.apply(ID, eventId, new MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED), HOSTNAME),
             eventId -> Started.apply(ID, eventId, HOSTNAME)
@@ -86,7 +86,36 @@ class TaskAggregateTest {
     }
 
     @Test
-    void givenCancelRequestedTaskEmitEventWhenUpdateAdditionalInformationCommand() {
+    void givenInProgressTaskWithOneNewerUpdateShouldEmitEventWhenUpdateAdditionalInformationCommand() {
+        History history = buildHistory(
+            eventId -> Created.apply(ID, eventId, new MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED), HOSTNAME),
+            eventId -> Started.apply(ID, eventId, HOSTNAME),
+            eventId -> AdditionalInformationUpdated.apply(ID, eventId, new MemoryReferenceWithCounterTask.AdditionalInformation(1, timestamp))
+        );
+        TaskAggregate aggregate = TaskAggregate.fromHistory(ID, history);
+        Instant newEventTime = TaskAggregateTest.timestamp.plusSeconds(3);
+        MemoryReferenceWithCounterTask.AdditionalInformation youngerAdditionalInformation = new MemoryReferenceWithCounterTask.AdditionalInformation(3, newEventTime);
+        assertThat(aggregate.update(youngerAdditionalInformation))
+            .isNotEmpty()
+            .anySatisfy(event -> assertThat(event)
+                .isInstanceOfSatisfying(AdditionalInformationUpdated.class,
+                    additionalInformationUpdated -> assertThat(additionalInformationUpdated.additionalInformation().timestamp()).isEqualTo(newEventTime)));
+    }
+
+    @Test
+    void givenInProgressTaskWithOneStalledUpdateShouldEmitEventWhenUpdateAdditionalInformationCommand() {
+        History history = buildHistory(
+            eventId -> Created.apply(ID, eventId, new MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED), HOSTNAME),
+            eventId -> Started.apply(ID, eventId, HOSTNAME),
+            eventId -> AdditionalInformationUpdated.apply(ID, eventId, new MemoryReferenceWithCounterTask.AdditionalInformation(1, timestamp))
+        );
+        TaskAggregate aggregate = TaskAggregate.fromHistory(ID, history);
+        MemoryReferenceWithCounterTask.AdditionalInformation olderAdditionalInformation = new MemoryReferenceWithCounterTask.AdditionalInformation(3, timestamp.minusSeconds(3));
+        assertThat(aggregate.update(olderAdditionalInformation)).isEmpty();
+    }
+
+    @Test
+    void givenCancelRequestedTaskShouldEmitEventWhenUpdateAdditionalInformationCommand() {
         History history = buildHistory(
             eventId -> Created.apply(ID, eventId, new MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED), HOSTNAME),
             eventId -> Started.apply(ID, eventId, HOSTNAME),
@@ -98,7 +127,7 @@ class TaskAggregateTest {
     }
 
     @Test
-    void givenCompletedTaskEmitNoEventWhenUpdateAdditionalInformationCommand() {
+    void givenCompletedTaskShouldEmitNoEventWhenUpdateAdditionalInformationCommand() {
         MemoryReferenceWithCounterTask task = new MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED);
         History history = buildHistory(
             eventId -> Created.apply(ID, eventId, task, HOSTNAME),
@@ -110,7 +139,7 @@ class TaskAggregateTest {
     }
 
     @Test
-    void givenFailedTaskEmitNoEventWhenUpdateAdditionalInformationCommand() {
+    void givenFailedTaskShouldEmitNoEventWhenUpdateAdditionalInformationCommand() {
         MemoryReferenceWithCounterTask task = new MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED);
         History history = buildHistory(
             eventId -> Created.apply(ID, eventId, task, HOSTNAME),
@@ -122,7 +151,7 @@ class TaskAggregateTest {
     }
 
     @Test
-    void givenCancelTaskEmitNoEventWhenUpdateAdditionalInformationCommand() {
+    void givenCancelTaskShouldEmitNoEventWhenUpdateAdditionalInformationCommand() {
         MemoryReferenceWithCounterTask task = new MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED);
         History history = buildHistory(
             eventId -> Created.apply(ID, eventId, task, HOSTNAME),


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org