You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/04/22 15:09:52 UTC
[james-project] 02/05: JAMES-3777 EventSourcingFilteringManagement: allow to customize the read projection
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 776dd4cb6d63044b2cd62398448638b1345c106e
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Apr 14 13:46:40 2023 +0700
JAMES-3777 EventSourcingFilteringManagement: allow to customize the read projection
---
.../impl/EventSourcingFilteringManagement.java | 74 ++++++++++++++++------
1 file changed, 55 insertions(+), 19 deletions(-)
diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java
index 8dfda3d241..8061428e2c 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java
@@ -19,12 +19,14 @@
package org.apache.james.jmap.api.filtering.impl;
+import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import javax.inject.Inject;
import org.apache.james.core.Username;
+import org.apache.james.eventsourcing.Event;
import org.apache.james.eventsourcing.EventSourcingSystem;
import org.apache.james.eventsourcing.Subscriber;
import org.apache.james.eventsourcing.eventstore.EventStore;
@@ -42,19 +44,67 @@ import com.google.common.collect.ImmutableSet;
import reactor.core.publisher.Mono;
public class EventSourcingFilteringManagement implements FilteringManagement {
+ public interface ReadProjection {
+ Publisher<Rules> listRulesForUser(Username username);
+
+ Publisher<Version> getLatestVersion(Username username);
+
+ Optional<Subscriber> subscriber();
+ }
+
+ public static class NoReadProjection implements ReadProjection {
+ private final EventStore eventStore;
+
+ @Inject
+ public NoReadProjection(EventStore eventStore) {
+ this.eventStore = eventStore;
+ }
+
+ @Override
+ public Publisher<Rules> listRulesForUser(Username username) {
+ Preconditions.checkNotNull(username);
+
+ FilteringAggregateId aggregateId = new FilteringAggregateId(username);
+
+ return Mono.from(eventStore.getEventsOfAggregate(aggregateId))
+ .map(history -> FilteringAggregate.load(aggregateId, history).listRules())
+ .defaultIfEmpty(new Rules(ImmutableList.of(), Version.INITIAL));
+ }
+
+ @Override
+ public Publisher<Version> getLatestVersion(Username username) {
+ Preconditions.checkNotNull(username);
+
+ FilteringAggregateId aggregateId = new FilteringAggregateId(username);
+
+ return Mono.from(eventStore.getEventsOfAggregate(aggregateId))
+ .map(History::getVersionAsJava)
+ .map(eventIdOptional -> eventIdOptional.map(eventId -> new Version(eventId.value()))
+ .orElse(Version.INITIAL));
+ }
+
+ @Override
+ public Optional<Subscriber> subscriber() {
+ return Optional.empty();
+ }
+ }
private static final ImmutableSet<Subscriber> NO_SUBSCRIBER = ImmutableSet.of();
- private final EventStore eventStore;
+ private final ReadProjection readProjection;
private final EventSourcingSystem eventSourcingSystem;
@Inject
public EventSourcingFilteringManagement(EventStore eventStore) {
+ this(eventStore, new NoReadProjection(eventStore));
+ }
+
+ public EventSourcingFilteringManagement(EventStore eventStore, ReadProjection readProjection) {
+ this.readProjection = new NoReadProjection(eventStore);
this.eventSourcingSystem = EventSourcingSystem.fromJava(
ImmutableSet.of(new DefineRulesCommandHandler(eventStore)),
- NO_SUBSCRIBER,
+ readProjection.subscriber().map(ImmutableSet::of).orElse(NO_SUBSCRIBER),
eventStore);
- this.eventStore = eventStore;
}
@Override
@@ -68,25 +118,11 @@ public class EventSourcingFilteringManagement implements FilteringManagement {
@Override
public Publisher<Rules> listRulesForUser(Username username) {
- Preconditions.checkNotNull(username);
-
- FilteringAggregateId aggregateId = new FilteringAggregateId(username);
-
- return Mono.from(eventStore.getEventsOfAggregate(aggregateId))
- .map(history -> FilteringAggregate.load(aggregateId, history).listRules())
- .defaultIfEmpty(new Rules(ImmutableList.of(), Version.INITIAL));
+ return readProjection.listRulesForUser(username);
}
@Override
public Publisher<Version> getLatestVersion(Username username) {
- Preconditions.checkNotNull(username);
-
- FilteringAggregateId aggregateId = new FilteringAggregateId(username);
-
- return Mono.from(eventStore.getEventsOfAggregate(aggregateId))
- .map(History::getVersionAsJava)
- .map(eventIdOptional -> eventIdOptional.map(eventId -> new Version(eventId.value()))
- .orElse(Version.INITIAL));
+ return readProjection.getLatestVersion(username);
}
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org