You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2020/12/07 03:57:36 UTC
[james-project] 06/13: JAMES-2393 Dispatching an EventSourcing
command can return the generated events
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 9449c8353182e4634bfd1bf5f586fdb2697558d5
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Dec 4 12:00:20 2020 +0700
JAMES-2393 Dispatching an EventSourcing command can return the generated events
This enables reusing the state of the transaction
---
.../apache/james/eventsourcing/CommandDispatcher.scala | 10 ++++------
.../apache/james/eventsourcing/EventSourcingSystem.scala | 4 +++-
.../mailing/listeners/QuotaThresholdCrossingListener.java | 3 ++-
.../filtering/impl/EventSourcingFilteringManagement.java | 2 +-
.../james/task/eventsourcing/WorkerStatusListener.scala | 15 +++++++--------
5 files changed, 17 insertions(+), 17 deletions(-)
diff --git a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/CommandDispatcher.scala b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/CommandDispatcher.scala
index 05abc6f..a547bc4 100644
--- a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/CommandDispatcher.scala
+++ b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/CommandDispatcher.scala
@@ -20,12 +20,10 @@ package org.apache.james.eventsourcing
import java.util
+import com.google.common.base.Preconditions
import javax.inject.Inject
-
import org.apache.james.eventsourcing.eventstore.EventStoreFailedException
import org.reactivestreams.Publisher
-
-import com.google.common.base.Preconditions
import reactor.core.scala.publisher.SMono
import scala.jdk.CollectionConverters._
@@ -51,7 +49,7 @@ object CommandDispatcher {
class CommandDispatcher @Inject()(eventBus: EventBus, handlers: Set[CommandHandler[_ <: Command]]) {
Preconditions.checkArgument(hasOnlyOneHandlerByCommand(handlers), CommandDispatcher.ONLY_ONE_HANDLER_PRECONDITION)
- def dispatch(c: Command): Publisher[Void] = {
+ def dispatch(c: Command): Publisher[util.List[_ <: Event]] = {
tryDispatch(c)
.retry(CommandDispatcher.MAX_RETRY, {
case _: EventStoreFailedException => true
@@ -71,11 +69,11 @@ class CommandDispatcher @Inject()(eventBus: EventBus, handlers: Set[CommandHandl
private val handlersByClass: Map[Class[_ <: Command], CommandHandler[_ <: Command]] =
handlers.map(handler => (handler.handledClass, handler)).toMap
- private def tryDispatch(c: Command): SMono[Void] = {
+ private def tryDispatch(c: Command): SMono[util.List[_ <: Event]] = {
handleCommand(c) match {
case Some(eventsPublisher) =>
SMono(eventsPublisher)
- .flatMap(events => eventBus.publish(events.asScala))
+ .flatMap(events => eventBus.publish(events.asScala).`then`(SMono.just(events)))
case _ =>
SMono.raiseError(CommandDispatcher.UnknownCommandException(c))
}
diff --git a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventSourcingSystem.scala b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventSourcingSystem.scala
index 5cc52f2..804aa17 100644
--- a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventSourcingSystem.scala
+++ b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventSourcingSystem.scala
@@ -18,6 +18,8 @@
****************************************************************/
package org.apache.james.eventsourcing
+import java.util
+
import org.apache.james.eventsourcing.eventstore.EventStore
import org.reactivestreams.Publisher
@@ -37,5 +39,5 @@ class EventSourcingSystem(handlers: Set[CommandHandler[_ <: Command]],
private val eventBus = new EventBus(eventStore, subscribers)
private val commandDispatcher = new CommandDispatcher(eventBus, handlers)
- def dispatch(c: Command): Publisher[Void] = commandDispatcher.dispatch(c)
+ def dispatch(c: Command): Publisher[util.List[_ <: Event]] = commandDispatcher.dispatch(c)
}
\ No newline at end of file
diff --git a/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/listeners/QuotaThresholdCrossingListener.java b/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/listeners/QuotaThresholdCrossingListener.java
index d705740..7c9760c 100644
--- a/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/listeners/QuotaThresholdCrossingListener.java
+++ b/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/listeners/QuotaThresholdCrossingListener.java
@@ -89,6 +89,7 @@ public class QuotaThresholdCrossingListener implements MailboxListener.ReactiveG
private Mono<Void> handleEvent(Username username, QuotaUsageUpdatedEvent event) {
return Mono.from(eventSourcingSystem.dispatch(
- new DetectThresholdCrossing(username, event.getCountQuota(), event.getSizeQuota(), event.getInstant())));
+ new DetectThresholdCrossing(username, event.getCountQuota(), event.getSizeQuota(), event.getInstant())))
+ .then();
}
}
diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java
index 662fad1..485c194 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java
@@ -54,7 +54,7 @@ public class EventSourcingFilteringManagement implements FilteringManagement {
@Override
public Publisher<Void> defineRulesForUser(Username username, List<Rule> rules) {
- return eventSourcingSystem.dispatch(new DefineRulesCommand(username, rules));
+ return Mono.from(eventSourcingSystem.dispatch(new DefineRulesCommand(username, rules))).then();
}
@Override
diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala
index 1edbf03..721792b 100644
--- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala
+++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/WorkerStatusListener.scala
@@ -27,30 +27,29 @@ import org.apache.james.task.Task.Result
import org.apache.james.task.eventsourcing.TaskCommand._
import org.apache.james.task.{TaskExecutionDetails, TaskId, TaskManagerWorker}
import org.reactivestreams.Publisher
-
import reactor.core.scala.publisher.SMono
import scala.compat.java8.OptionConverters._
case class WorkerStatusListener(eventSourcingSystem: EventSourcingSystem) extends TaskManagerWorker.Listener {
- override def started(taskId: TaskId): Publisher[Void] = eventSourcingSystem.dispatch(Start(taskId))
+ override def started(taskId: TaskId): Publisher[Void] = SMono(eventSourcingSystem.dispatch(Start(taskId))).`then`()
override def completed(taskId: TaskId, result: Result, additionalInformation: Optional[TaskExecutionDetails.AdditionalInformation]): Publisher[Void] =
- eventSourcingSystem.dispatch(Complete(taskId, result, additionalInformation.asScala))
+ SMono(eventSourcingSystem.dispatch(Complete(taskId, result, additionalInformation.asScala))).`then`()
override def failed(taskId: TaskId, additionalInformation: Optional[TaskExecutionDetails.AdditionalInformation], errorMessage: String, t: Throwable): Publisher[Void] =
- eventSourcingSystem.dispatch(Fail(taskId, additionalInformation.asScala, Some(errorMessage), Some(Throwables.getStackTraceAsString(t))))
+ SMono(eventSourcingSystem.dispatch(Fail(taskId, additionalInformation.asScala, Some(errorMessage), Some(Throwables.getStackTraceAsString(t))))).`then`()
override def failed(taskId: TaskId, additionalInformation: Optional[TaskExecutionDetails.AdditionalInformation], t: Throwable): Publisher[Void] =
- eventSourcingSystem.dispatch(Fail(taskId, additionalInformation.asScala, None, Some(Throwables.getStackTraceAsString(t))))
+ SMono(eventSourcingSystem.dispatch(Fail(taskId, additionalInformation.asScala, None, Some(Throwables.getStackTraceAsString(t))))).`then`()
override def failed(taskId: TaskId, additionalInformation: Optional[TaskExecutionDetails.AdditionalInformation]): Publisher[Void] =
- eventSourcingSystem.dispatch(Fail(taskId, additionalInformation.asScala, None, None))
+ SMono(eventSourcingSystem.dispatch(Fail(taskId, additionalInformation.asScala, None, None))).`then`()
override def cancelled(taskId: TaskId, additionalInformation: Optional[TaskExecutionDetails.AdditionalInformation]): Publisher[Void] =
- eventSourcingSystem.dispatch(Cancel(taskId, additionalInformation.asScala ))
+ SMono(eventSourcingSystem.dispatch(Cancel(taskId, additionalInformation.asScala ))).`then`()
override def updated(taskId: TaskId, additionalInformation: TaskExecutionDetails.AdditionalInformation): Publisher[Void] =
- eventSourcingSystem.dispatch(UpdateAdditionalInformation(taskId, additionalInformation))
+ SMono(eventSourcingSystem.dispatch(UpdateAdditionalInformation(taskId, additionalInformation))).`then`()
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org