You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2019/10/18 06:40:49 UTC
[james-project] 16/17: JAMES-2813 use timestamp in additional
information to refuse stalled update event
This is an automated email from the ASF dual-hosted git repository.
rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 1da890c47c4cc0eae8686a304bf2a923bec60819
Author: RĂ©mi KOWALSKI <rk...@linagora.com>
AuthorDate: Tue Oct 15 17:54:38 2019 +0200
JAMES-2813 use timestamp in additional information to refuse stalled update event
---
.../task/eventsourcing/DecisionProjection.scala | 24 +++++++------
.../james/task/eventsourcing/TaskAggregate.scala | 21 ++++++-----
.../task/eventsourcing/TaskAggregateTest.java | 41 ++++++++++++++++++----
3 files changed, 58 insertions(+), 28 deletions(-)
diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/DecisionProjection.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/DecisionProjection.scala
index b5a460f..ce2f6d2 100644
--- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/DecisionProjection.scala
+++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/DecisionProjection.scala
@@ -18,27 +18,29 @@
* ***************************************************************/
package org.apache.james.task.eventsourcing
+import java.time.Instant
+
import org.apache.james.eventsourcing.Event
import org.apache.james.task.TaskManager.Status
-case class DecisionProjection(status: Option[Status]) {
+case class DecisionProjection(status: Status, latestUpdateAdditionalInformationUpdate : Option[Instant]) {
def update(event: Event): DecisionProjection = {
- DecisionProjection(
event match {
- case event: Created => Some(Status.WAITING)
- case event: Started => Some(Status.IN_PROGRESS)
- case event: CancelRequested => Some(Status.CANCEL_REQUESTED)
- case event: Cancelled => Some(Status.CANCELLED)
- case event: Completed => Some(Status.COMPLETED)
- case event: Failed => Some(Status.FAILED)
- case event: AdditionalInformationUpdated => status
+ case _: Created => this
+ case _: Started => DecisionProjection(Status.IN_PROGRESS, None)
+ case _: CancelRequested => DecisionProjection(Status.CANCEL_REQUESTED,latestUpdateAdditionalInformationUpdate)
+ case event: Cancelled => DecisionProjection(Status.CANCELLED, event.additionalInformation.map(_.timestamp))
+ case event: Completed => DecisionProjection(Status.COMPLETED, event.additionalInformation.map(_.timestamp))
+ case event: Failed => DecisionProjection(Status.FAILED, event.additionalInformation.map(_.timestamp))
+ case event: AdditionalInformationUpdated => DecisionProjection(status, Some(event.additionalInformation.timestamp))
}
- )
}
+ def additionalInformationIsOlderThan(timestamp: Instant) : Boolean = latestUpdateAdditionalInformationUpdate.forall(timestamp.isAfter)
+
}
object DecisionProjection {
- def empty: DecisionProjection = DecisionProjection(None)
+ def initial(created : Created): DecisionProjection = DecisionProjection(Status.WAITING, None)
}
diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala
index 1a4bbbb..d73cf65 100644
--- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala
+++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala
@@ -31,22 +31,21 @@ import scala.collection.JavaConverters._
class TaskAggregate private(val aggregateId: TaskAggregateId, private val history: History) {
- history.getEvents.asScala.headOption match {
- case Some(Created(_, _, _, _)) =>
+ val initialEvent = history.getEvents.asScala.headOption match {
+ case Some(created @ Created(_, _, _, _)) => created
case _ => throw new IllegalArgumentException("History must start with Created event")
}
- private val currentStatus: Status = history
+ private val currentDecisionProjection: DecisionProjection = history
.getEvents
.asScala
- .foldLeft(DecisionProjection.empty)((decision, event) => decision.update(event))
- .status
- .get
+ .tail
+ .foldLeft(DecisionProjection.initial(initialEvent))((decision, event) => decision.update(event))
private def optionToJavaList[T](element: Option[T]): util.List[T] = element.toList.asJava
private def createEventIfNotFinished(event: EventId => Event): Option[Event] = {
- if (!currentStatus.isFinished) {
+ if (!currentDecisionProjection.status.isFinished) {
Some(event(history.getNextEventId))
} else
None
@@ -63,11 +62,11 @@ class TaskAggregate private(val aggregateId: TaskAggregateId, private val histor
createEventIfNotFinishedAsJavaList(CancelRequested(aggregateId, _, hostname))
private[eventsourcing] def update(additionalInformation: AdditionalInformation): util.List[Event] =
- (currentStatus match {
- case Status.IN_PROGRESS => createEvent(AdditionalInformationUpdated(aggregateId, _, additionalInformation))
- case Status.CANCEL_REQUESTED => createEvent(AdditionalInformationUpdated(aggregateId, _, additionalInformation))
+ optionToJavaList(currentDecisionProjection.status match {
+ case Status.IN_PROGRESS if currentDecisionProjection.additionalInformationIsOlderThan(additionalInformation.timestamp) => createEvent(AdditionalInformationUpdated(aggregateId, _, additionalInformation))
+ case Status.CANCEL_REQUESTED if currentDecisionProjection.additionalInformationIsOlderThan(additionalInformation.timestamp) => createEvent(AdditionalInformationUpdated(aggregateId, _, additionalInformation))
case _ => None
- }).toList.asJava
+ })
private[eventsourcing] def complete(result: Result, additionalInformation: Option[AdditionalInformation]): util.List[Event] =
createEventIfNotFinishedAsJavaList(Completed(aggregateId, _, result, additionalInformation))
diff --git a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TaskAggregateTest.java b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TaskAggregateTest.java
index bb98b26..e12f8bc 100644
--- a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TaskAggregateTest.java
+++ b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TaskAggregateTest.java
@@ -66,7 +66,7 @@ class TaskAggregateTest {
}
@Test
- void givenNoStartedTaskEmitNoEventWhenUpdateAdditionalInformationCommand() {
+ void givenNoStartedTaskShouldEmitNoEventWhenUpdateAdditionalInformationCommand() {
History history = buildHistory(
eventId -> Created.apply(ID, eventId, new MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED), HOSTNAME)
);
@@ -75,7 +75,7 @@ class TaskAggregateTest {
}
@Test
- void givenInProgressTaskEmitEventWhenUpdateAdditionalInformationCommand() {
+ void givenInProgressTaskShouldEmitEventWhenUpdateAdditionalInformationCommand() {
History history = buildHistory(
eventId -> Created.apply(ID, eventId, new MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED), HOSTNAME),
eventId -> Started.apply(ID, eventId, HOSTNAME)
@@ -86,7 +86,36 @@ class TaskAggregateTest {
}
@Test
- void givenCancelRequestedTaskEmitEventWhenUpdateAdditionalInformationCommand() {
+ void givenInProgressTaskWithOneNewerUpdateShouldEmitEventWhenUpdateAdditionalInformationCommand() {
+ History history = buildHistory(
+ eventId -> Created.apply(ID, eventId, new MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED), HOSTNAME),
+ eventId -> Started.apply(ID, eventId, HOSTNAME),
+ eventId -> AdditionalInformationUpdated.apply(ID, eventId, new MemoryReferenceWithCounterTask.AdditionalInformation(1, timestamp))
+ );
+ TaskAggregate aggregate = TaskAggregate.fromHistory(ID, history);
+ Instant newEventTime = TaskAggregateTest.timestamp.plusSeconds(3);
+ MemoryReferenceWithCounterTask.AdditionalInformation youngerAdditionalInformation = new MemoryReferenceWithCounterTask.AdditionalInformation(3, newEventTime);
+ assertThat(aggregate.update(youngerAdditionalInformation))
+ .isNotEmpty()
+ .anySatisfy(event -> assertThat(event)
+ .isInstanceOfSatisfying(AdditionalInformationUpdated.class,
+ additionalInformationUpdated -> assertThat(additionalInformationUpdated.additionalInformation().timestamp()).isEqualTo(newEventTime)));
+ }
+
+ @Test
+ void givenInProgressTaskWithOneStalledUpdateShouldEmitEventWhenUpdateAdditionalInformationCommand() {
+ History history = buildHistory(
+ eventId -> Created.apply(ID, eventId, new MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED), HOSTNAME),
+ eventId -> Started.apply(ID, eventId, HOSTNAME),
+ eventId -> AdditionalInformationUpdated.apply(ID, eventId, new MemoryReferenceWithCounterTask.AdditionalInformation(1, timestamp))
+ );
+ TaskAggregate aggregate = TaskAggregate.fromHistory(ID, history);
+ MemoryReferenceWithCounterTask.AdditionalInformation olderAdditionalInformation = new MemoryReferenceWithCounterTask.AdditionalInformation(3, timestamp.minusSeconds(3));
+ assertThat(aggregate.update(olderAdditionalInformation)).isEmpty();
+ }
+
+ @Test
+ void givenCancelRequestedTaskShouldEmitEventWhenUpdateAdditionalInformationCommand() {
History history = buildHistory(
eventId -> Created.apply(ID, eventId, new MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED), HOSTNAME),
eventId -> Started.apply(ID, eventId, HOSTNAME),
@@ -98,7 +127,7 @@ class TaskAggregateTest {
}
@Test
- void givenCompletedTaskEmitNoEventWhenUpdateAdditionalInformationCommand() {
+ void givenCompletedTaskShouldEmitNoEventWhenUpdateAdditionalInformationCommand() {
MemoryReferenceWithCounterTask task = new MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED);
History history = buildHistory(
eventId -> Created.apply(ID, eventId, task, HOSTNAME),
@@ -110,7 +139,7 @@ class TaskAggregateTest {
}
@Test
- void givenFailedTaskEmitNoEventWhenUpdateAdditionalInformationCommand() {
+ void givenFailedTaskShouldEmitNoEventWhenUpdateAdditionalInformationCommand() {
MemoryReferenceWithCounterTask task = new MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED);
History history = buildHistory(
eventId -> Created.apply(ID, eventId, task, HOSTNAME),
@@ -122,7 +151,7 @@ class TaskAggregateTest {
}
@Test
- void givenCancelTaskEmitNoEventWhenUpdateAdditionalInformationCommand() {
+ void givenCancelTaskShouldEmitNoEventWhenUpdateAdditionalInformationCommand() {
MemoryReferenceWithCounterTask task = new MemoryReferenceWithCounterTask((counter) -> Task.Result.COMPLETED);
History history = buildHistory(
eventId -> Created.apply(ID, eventId, task, HOSTNAME),
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org