You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/05/14 06:42:43 UTC

[james-project] 11/15: JAMES-2393 Allow writing reactive eventSourcing subscribers

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 6bc003b2340d5d49798fefd97d849543cd47a82f
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sat May 8 16:48:14 2021 +0700

    JAMES-2393 Allow writing reactive eventSourcing subscribers
---
 .../org/apache/james/eventsourcing/EventBus.scala  | 25 +++++++---------------
 .../apache/james/eventsourcing/Subscriber.scala    | 25 ++++++++++++++++++++++
 .../mail/eventsourcing/acl/AclV2DAOSubscriber.java | 12 ++++++-----
 .../eventsourcing/acl/UserRightsDAOSubscriber.java | 13 ++++++-----
 4 files changed, 48 insertions(+), 27 deletions(-)

diff --git a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventBus.scala b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventBus.scala
index ca8d753..e932252 100644
--- a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventBus.scala
+++ b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventBus.scala
@@ -19,11 +19,9 @@
 package org.apache.james.eventsourcing
 
 import javax.inject.Inject
-
 import org.apache.james.eventsourcing.eventstore.{EventStore, EventStoreFailedException}
 import org.reactivestreams.Publisher
 import org.slf4j.LoggerFactory
-
 import reactor.core.scala.publisher.{SFlux, SMono}
 
 object EventBus {
@@ -32,27 +30,20 @@ object EventBus {
 
 class EventBus @Inject() (eventStore: EventStore, subscribers: Set[Subscriber]) {
   @throws[EventStoreFailedException]
-  def publish(events: Iterable[Event]): SMono[Void] = {
+  def publish(events: Iterable[Event]): SMono[Void] =
     SMono(eventStore.appendAll(events))
         .`then`(runHandlers(events, subscribers))
 
-  }
-
-  def runHandlers(events: Iterable[Event], subscribers: Set[Subscriber]): SMono[Void] = {
+  def runHandlers(events: Iterable[Event], subscribers: Set[Subscriber]): SMono[Void] =
     SFlux.fromIterable(events.flatMap((event: Event) => subscribers.map(subscriber => (event, subscriber))))
-      .flatMap(infos => runHandler(infos._1, infos._2))
+      .flatMapSequential(infos => runHandler(infos._1, infos._2))
       .`then`()
       .`then`(SMono.empty)
-  }
-
-  def runHandler(event: Event, subscriber: Subscriber): Publisher[Void] = SMono.fromCallable(() => handle(event, subscriber)).`then`(SMono.empty)
 
-  private def handle(event : Event, subscriber: Subscriber) : Unit = {
-    try {
-      subscriber.handle(event)
-    } catch {
-      case e: Exception =>
+  def runHandler(event: Event, subscriber: Subscriber): Publisher[Void] =
+    SMono(ReactiveSubscriber.asReactiveSubscriber(subscriber).handleReactive(event))
+      .onErrorResume(e => {
         EventBus.LOGGER.error("Error while calling {} for {}", subscriber, event, e)
-    }
-  }
+        SMono.empty
+      })
 }
\ No newline at end of file
diff --git a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/Subscriber.scala b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/Subscriber.scala
index ad6cf0a..3b14fef 100644
--- a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/Subscriber.scala
+++ b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/Subscriber.scala
@@ -18,6 +18,31 @@
  * ***************************************************************/
 package org.apache.james.eventsourcing
 
+import org.reactivestreams.Publisher
+import reactor.core.scala.publisher.SMono
+import reactor.core.scheduler.Schedulers
+
 trait Subscriber {
   def handle(event: Event) : Unit
+}
+
+trait ReactiveSubscriber extends Subscriber {
+  def handleReactive(event: Event): Publisher[Void]
+
+  override def handle(event: Event) : Unit = SMono(handleReactive(event)).block()
+}
+
+object ReactiveSubscriber {
+  def asReactiveSubscriber(subscriber: Subscriber): ReactiveSubscriber = subscriber match {
+    case reactive: ReactiveSubscriber => reactive
+    case nonReactive => new ReactiveSubscriberWrapper(nonReactive)
+  }
+}
+
+class ReactiveSubscriberWrapper(delegate: Subscriber) extends ReactiveSubscriber {
+  override def handle(event: Event) : Unit = delegate.handle(event)
+
+  def handleReactive(event: Event): Publisher[Void] = SMono.fromCallable(() => handle(event))
+    .subscribeOn(Schedulers.elastic())
+    .`then`()
 }
\ No newline at end of file
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/AclV2DAOSubscriber.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/AclV2DAOSubscriber.java
index 6c579ea..1b3ecad 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/AclV2DAOSubscriber.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/AclV2DAOSubscriber.java
@@ -20,12 +20,13 @@
 package org.apache.james.mailbox.cassandra.mail.eventsourcing.acl;
 
 import org.apache.james.eventsourcing.Event;
-import org.apache.james.eventsourcing.Subscriber;
+import org.apache.james.eventsourcing.ReactiveSubscriber;
 import org.apache.james.mailbox.cassandra.mail.CassandraACLDAOV2;
 
 import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
 
-public class AclV2DAOSubscriber implements Subscriber {
+public class AclV2DAOSubscriber implements ReactiveSubscriber {
     private final CassandraACLDAOV2 acldaov2;
 
     public AclV2DAOSubscriber(CassandraACLDAOV2 acldaov2) {
@@ -33,15 +34,16 @@ public class AclV2DAOSubscriber implements Subscriber {
     }
 
     @Override
-    public void handle(Event event) {
+    public Mono<Void> handleReactive(Event event) {
         if (event instanceof ACLUpdated) {
             ACLUpdated aclUpdated = (ACLUpdated) event;
 
-            Flux.fromStream(
+            return Flux.fromStream(
                 aclUpdated.getAclDiff()
                     .commands())
                 .flatMap(command -> acldaov2.updateACL(aclUpdated.mailboxId(), command))
-                .blockLast();
+                .then();
         }
+        return Mono.empty();
     }
 }
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/UserRightsDAOSubscriber.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/UserRightsDAOSubscriber.java
index 4bdfb19..32eec22 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/UserRightsDAOSubscriber.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/UserRightsDAOSubscriber.java
@@ -20,10 +20,13 @@
 package org.apache.james.mailbox.cassandra.mail.eventsourcing.acl;
 
 import org.apache.james.eventsourcing.Event;
-import org.apache.james.eventsourcing.Subscriber;
+import org.apache.james.eventsourcing.ReactiveSubscriber;
 import org.apache.james.mailbox.cassandra.mail.CassandraUserMailboxRightsDAO;
+import org.reactivestreams.Publisher;
 
-public class UserRightsDAOSubscriber implements Subscriber {
+import reactor.core.publisher.Mono;
+
+public class UserRightsDAOSubscriber implements ReactiveSubscriber {
     private final CassandraUserMailboxRightsDAO userRightsDAO;
 
     public UserRightsDAOSubscriber(CassandraUserMailboxRightsDAO userRightsDAO) {
@@ -31,11 +34,11 @@ public class UserRightsDAOSubscriber implements Subscriber {
     }
 
     @Override
-    public void handle(Event event) {
+    public Publisher<Void> handleReactive(Event event) {
         if (event instanceof ACLUpdated) {
             ACLUpdated aclUpdated = (ACLUpdated) event;
-            userRightsDAO.update(aclUpdated.mailboxId(), aclUpdated.getAclDiff())
-                .block();
+            return userRightsDAO.update(aclUpdated.mailboxId(), aclUpdated.getAclDiff());
         }
+        return Mono.empty();
     }
 }

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org