You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2020/12/07 03:57:36 UTC

[james-project] 06/13: JAMES-2393 Dispatching an EventSourcing command can return the generated events

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 9449c8353182e4634bfd1bf5f586fdb2697558d5
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Dec 4 12:00:20 2020 +0700

    JAMES-2393 Dispatching an EventSourcing command can return the generated events
    
    This enables reusing the state of the transaction
---
 .../apache/james/eventsourcing/CommandDispatcher.scala    | 10 ++++------
 .../apache/james/eventsourcing/EventSourcingSystem.scala  |  4 +++-
 .../mailing/listeners/QuotaThresholdCrossingListener.java |  3 ++-
 .../filtering/impl/EventSourcingFilteringManagement.java  |  2 +-
 .../james/task/eventsourcing/WorkerStatusListener.scala   | 15 +++++++--------
 5 files changed, 17 insertions(+), 17 deletions(-)

diff --git a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/CommandDispatcher.scala b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/CommandDispatcher.scala
index 05abc6f..a547bc4 100644
--- a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/CommandDispatcher.scala
+++ b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/CommandDispatcher.scala
@@ -20,12 +20,10 @@ package org.apache.james.eventsourcing
 
 import java.util
 
+import com.google.common.base.Preconditions
 import javax.inject.Inject
-
 import org.apache.james.eventsourcing.eventstore.EventStoreFailedException
 import org.reactivestreams.Publisher
-
-import com.google.common.base.Preconditions
 import reactor.core.scala.publisher.SMono
 
 import scala.jdk.CollectionConverters._
@@ -51,7 +49,7 @@ object CommandDispatcher {
 class CommandDispatcher @Inject()(eventBus: EventBus, handlers: Set[CommandHandler[_ <: Command]]) {
   Preconditions.checkArgument(hasOnlyOneHandlerByCommand(handlers), CommandDispatcher.ONLY_ONE_HANDLER_PRECONDITION)
 
-  def dispatch(c: Command): Publisher[Void] = {
+  def dispatch(c: Command): Publisher[util.List[_ <: Event]] = {
     tryDispatch(c)
       .retry(CommandDispatcher.MAX_RETRY, {
         case _: EventStoreFailedException => true
@@ -71,11 +69,11 @@ class CommandDispatcher @Inject()(eventBus: EventBus, handlers: Set[CommandHandl
   private val handlersByClass: Map[Class[_ <: Command], CommandHandler[_ <: Command]] =
     handlers.map(handler => (handler.handledClass, handler)).toMap
 
-  private def tryDispatch(c: Command): SMono[Void] = {
+  private def tryDispatch(c: Command): SMono[util.List[_ <: Event]] = {
     handleCommand(c) match {
       case Some(eventsPublisher) =>
         SMono(eventsPublisher)
-          .flatMap(events => eventBus.publish(events.asScala))
+          .flatMap(events => eventBus.publish(events.asScala).`then`(SMono.just(events)))
       case _ =>
         SMono.raiseError(CommandDispatcher.UnknownCommandException(c))
     }
diff --git a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventSourcingSystem.scala b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventSourcingSystem.scala
index 5cc52f2..804aa17 100644
--- a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventSourcingSystem.scala
+++ b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventSourcingSystem.scala
@@ -18,6 +18,8 @@
  ****************************************************************/
 package org.apache.james.eventsourcing
 
+import java.util
+
 import org.apache.james.eventsourcing.eventstore.EventStore
 import org.reactivestreams.Publisher
 
@@ -37,5 +39,5 @@ class EventSourcingSystem(handlers: Set[CommandHandler[_ <: Command]],
   private val eventBus = new EventBus(eventStore, subscribers)
   private val commandDispatcher = new CommandDispatcher(eventBus, handlers)
 
-  def dispatch(c: Command): Publisher[Void] = commandDispatcher.dispatch(c)
+  def dispatch(c: Command): Publisher[util.List[_ <: Event]] = commandDispatcher.dispatch(c)
 }
\ No newline at end of file
diff --git a/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/listeners/QuotaThresholdCrossingListener.java b/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/listeners/QuotaThresholdCrossingListener.java
index d705740..7c9760c 100644
--- a/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/listeners/QuotaThresholdCrossingListener.java
+++ b/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/listeners/QuotaThresholdCrossingListener.java
@@ -89,6 +89,7 @@ public class QuotaThresholdCrossingListener implements MailboxListener.ReactiveG
 
     private Mono<Void> handleEvent(Username username, QuotaUsageUpdatedEvent event) {
         return Mono.from(eventSourcingSystem.dispatch(
-            new DetectThresholdCrossing(username, event.getCountQuota(), event.getSizeQuota(), event.getInstant())));
+                new DetectThresholdCrossing(username, event.getCountQuota(), event.getSizeQuota(), event.getInstant())))
+            .then();
     }
 }
diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java
index 662fad1..485c194 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java
@@ -54,7 +54,7 @@ public class EventSourcingFilteringManagement implements FilteringManagement {
 
     @Override
     public Publisher<Void> defineRulesForUser(Username username, List<Rule> rules) {
-        return eventSourcingSystem.dispatch(new DefineRulesCommand(username, rules));
+        return Mono.from(eventSourcingSystem.dispatch(new DefineRulesCommand(username, rules))).then();
     }
 
     @Override
diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala
index 1edbf03..721792b 100644
--- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala
+++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala
@@ -27,30 +27,29 @@ import org.apache.james.task.Task.Result
 import org.apache.james.task.eventsourcing.TaskCommand._
 import org.apache.james.task.{TaskExecutionDetails, TaskId, TaskManagerWorker}
 import org.reactivestreams.Publisher
-
 import reactor.core.scala.publisher.SMono
 
 import scala.compat.java8.OptionConverters._
 
 case class WorkerStatusListener(eventSourcingSystem: EventSourcingSystem) extends TaskManagerWorker.Listener {
 
-  override def started(taskId: TaskId): Publisher[Void] = eventSourcingSystem.dispatch(Start(taskId))
+  override def started(taskId: TaskId): Publisher[Void] = SMono(eventSourcingSystem.dispatch(Start(taskId))).`then`()
 
   override def completed(taskId: TaskId, result: Result, additionalInformation: Optional[TaskExecutionDetails.AdditionalInformation]): Publisher[Void] =
-    eventSourcingSystem.dispatch(Complete(taskId, result, additionalInformation.asScala))
+    SMono(eventSourcingSystem.dispatch(Complete(taskId, result, additionalInformation.asScala))).`then`()
 
   override def failed(taskId: TaskId, additionalInformation: Optional[TaskExecutionDetails.AdditionalInformation], errorMessage: String, t: Throwable): Publisher[Void] =
-    eventSourcingSystem.dispatch(Fail(taskId, additionalInformation.asScala, Some(errorMessage), Some(Throwables.getStackTraceAsString(t))))
+    SMono(eventSourcingSystem.dispatch(Fail(taskId, additionalInformation.asScala, Some(errorMessage), Some(Throwables.getStackTraceAsString(t))))).`then`()
 
   override def failed(taskId: TaskId, additionalInformation: Optional[TaskExecutionDetails.AdditionalInformation], t: Throwable): Publisher[Void] =
-    eventSourcingSystem.dispatch(Fail(taskId, additionalInformation.asScala, None, Some(Throwables.getStackTraceAsString(t))))
+    SMono(eventSourcingSystem.dispatch(Fail(taskId, additionalInformation.asScala, None, Some(Throwables.getStackTraceAsString(t))))).`then`()
 
   override def failed(taskId: TaskId, additionalInformation: Optional[TaskExecutionDetails.AdditionalInformation]): Publisher[Void] =
-    eventSourcingSystem.dispatch(Fail(taskId, additionalInformation.asScala, None, None))
+    SMono(eventSourcingSystem.dispatch(Fail(taskId, additionalInformation.asScala, None, None))).`then`()
 
   override def cancelled(taskId: TaskId, additionalInformation: Optional[TaskExecutionDetails.AdditionalInformation]): Publisher[Void] =
-    eventSourcingSystem.dispatch(Cancel(taskId, additionalInformation.asScala ))
+    SMono(eventSourcingSystem.dispatch(Cancel(taskId, additionalInformation.asScala ))).`then`()
 
   override def updated(taskId: TaskId, additionalInformation: TaskExecutionDetails.AdditionalInformation): Publisher[Void] =
-    eventSourcingSystem.dispatch(UpdateAdditionalInformation(taskId, additionalInformation))
+    SMono(eventSourcingSystem.dispatch(UpdateAdditionalInformation(taskId, additionalInformation))).`then`()
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org