You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/05/14 06:42:43 UTC
[james-project] 11/15: JAMES-2393 Allow writing reactive
eventSourcing subscribers
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 6bc003b2340d5d49798fefd97d849543cd47a82f
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sat May 8 16:48:14 2021 +0700
JAMES-2393 Allow writing reactive eventSourcing subscribers
---
.../org/apache/james/eventsourcing/EventBus.scala | 25 +++++++---------------
.../apache/james/eventsourcing/Subscriber.scala | 25 ++++++++++++++++++++++
.../mail/eventsourcing/acl/AclV2DAOSubscriber.java | 12 ++++++-----
.../eventsourcing/acl/UserRightsDAOSubscriber.java | 13 ++++++-----
4 files changed, 48 insertions(+), 27 deletions(-)
diff --git a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventBus.scala b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventBus.scala
index ca8d753..e932252 100644
--- a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventBus.scala
+++ b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventBus.scala
@@ -19,11 +19,9 @@
package org.apache.james.eventsourcing
import javax.inject.Inject
-
import org.apache.james.eventsourcing.eventstore.{EventStore, EventStoreFailedException}
import org.reactivestreams.Publisher
import org.slf4j.LoggerFactory
-
import reactor.core.scala.publisher.{SFlux, SMono}
object EventBus {
@@ -32,27 +30,20 @@ object EventBus {
class EventBus @Inject() (eventStore: EventStore, subscribers: Set[Subscriber]) {
@throws[EventStoreFailedException]
- def publish(events: Iterable[Event]): SMono[Void] = {
+ def publish(events: Iterable[Event]): SMono[Void] =
SMono(eventStore.appendAll(events))
.`then`(runHandlers(events, subscribers))
- }
-
- def runHandlers(events: Iterable[Event], subscribers: Set[Subscriber]): SMono[Void] = {
+ def runHandlers(events: Iterable[Event], subscribers: Set[Subscriber]): SMono[Void] =
SFlux.fromIterable(events.flatMap((event: Event) => subscribers.map(subscriber => (event, subscriber))))
- .flatMap(infos => runHandler(infos._1, infos._2))
+ .flatMapSequential(infos => runHandler(infos._1, infos._2))
.`then`()
.`then`(SMono.empty)
- }
-
- def runHandler(event: Event, subscriber: Subscriber): Publisher[Void] = SMono.fromCallable(() => handle(event, subscriber)).`then`(SMono.empty)
- private def handle(event : Event, subscriber: Subscriber) : Unit = {
- try {
- subscriber.handle(event)
- } catch {
- case e: Exception =>
+ def runHandler(event: Event, subscriber: Subscriber): Publisher[Void] =
+ SMono(ReactiveSubscriber.asReactiveSubscriber(subscriber).handleReactive(event))
+ .onErrorResume(e => {
EventBus.LOGGER.error("Error while calling {} for {}", subscriber, event, e)
- }
- }
+ SMono.empty
+ })
}
\ No newline at end of file
diff --git a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/Subscriber.scala b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/Subscriber.scala
index ad6cf0a..3b14fef 100644
--- a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/Subscriber.scala
+++ b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/Subscriber.scala
@@ -18,6 +18,31 @@
* ***************************************************************/
package org.apache.james.eventsourcing
+import org.reactivestreams.Publisher
+import reactor.core.scala.publisher.SMono
+import reactor.core.scheduler.Schedulers
+
trait Subscriber {
def handle(event: Event) : Unit
+}
+
+trait ReactiveSubscriber extends Subscriber {
+ def handleReactive(event: Event): Publisher[Void]
+
+ override def handle(event: Event) : Unit = SMono(handleReactive(event)).block()
+}
+
+object ReactiveSubscriber {
+ def asReactiveSubscriber(subscriber: Subscriber): ReactiveSubscriber = subscriber match {
+ case reactive: ReactiveSubscriber => reactive
+ case nonReactive => new ReactiveSubscriberWrapper(nonReactive)
+ }
+}
+
+class ReactiveSubscriberWrapper(delegate: Subscriber) extends ReactiveSubscriber {
+ override def handle(event: Event) : Unit = delegate.handle(event)
+
+ def handleReactive(event: Event): Publisher[Void] = SMono.fromCallable(() => handle(event))
+ .subscribeOn(Schedulers.elastic())
+ .`then`()
}
\ No newline at end of file
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/AclV2DAOSubscriber.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/AclV2DAOSubscriber.java
index 6c579ea..1b3ecad 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/AclV2DAOSubscriber.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/AclV2DAOSubscriber.java
@@ -20,12 +20,13 @@
package org.apache.james.mailbox.cassandra.mail.eventsourcing.acl;
import org.apache.james.eventsourcing.Event;
-import org.apache.james.eventsourcing.Subscriber;
+import org.apache.james.eventsourcing.ReactiveSubscriber;
import org.apache.james.mailbox.cassandra.mail.CassandraACLDAOV2;
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
-public class AclV2DAOSubscriber implements Subscriber {
+public class AclV2DAOSubscriber implements ReactiveSubscriber {
private final CassandraACLDAOV2 acldaov2;
public AclV2DAOSubscriber(CassandraACLDAOV2 acldaov2) {
@@ -33,15 +34,16 @@ public class AclV2DAOSubscriber implements Subscriber {
}
@Override
- public void handle(Event event) {
+ public Mono<Void> handleReactive(Event event) {
if (event instanceof ACLUpdated) {
ACLUpdated aclUpdated = (ACLUpdated) event;
- Flux.fromStream(
+ return Flux.fromStream(
aclUpdated.getAclDiff()
.commands())
.flatMap(command -> acldaov2.updateACL(aclUpdated.mailboxId(), command))
- .blockLast();
+ .then();
}
+ return Mono.empty();
}
}
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/UserRightsDAOSubscriber.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/UserRightsDAOSubscriber.java
index 4bdfb19..32eec22 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/UserRightsDAOSubscriber.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/UserRightsDAOSubscriber.java
@@ -20,10 +20,13 @@
package org.apache.james.mailbox.cassandra.mail.eventsourcing.acl;
import org.apache.james.eventsourcing.Event;
-import org.apache.james.eventsourcing.Subscriber;
+import org.apache.james.eventsourcing.ReactiveSubscriber;
import org.apache.james.mailbox.cassandra.mail.CassandraUserMailboxRightsDAO;
+import org.reactivestreams.Publisher;
-public class UserRightsDAOSubscriber implements Subscriber {
+import reactor.core.publisher.Mono;
+
+public class UserRightsDAOSubscriber implements ReactiveSubscriber {
private final CassandraUserMailboxRightsDAO userRightsDAO;
public UserRightsDAOSubscriber(CassandraUserMailboxRightsDAO userRightsDAO) {
@@ -31,11 +34,11 @@ public class UserRightsDAOSubscriber implements Subscriber {
}
@Override
- public void handle(Event event) {
+ public Publisher<Void> handleReactive(Event event) {
if (event instanceof ACLUpdated) {
ACLUpdated aclUpdated = (ACLUpdated) event;
- userRightsDAO.update(aclUpdated.mailboxId(), aclUpdated.getAclDiff())
- .block();
+ return userRightsDAO.update(aclUpdated.mailboxId(), aclUpdated.getAclDiff());
}
+ return Mono.empty();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org