You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/04/22 15:09:52 UTC

[james-project] 02/05: JAMES-3777 EventSourcingFilteringManagement: allow to customize the read projection

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 776dd4cb6d63044b2cd62398448638b1345c106e
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Apr 14 13:46:40 2023 +0700

    JAMES-3777 EventSourcingFilteringManagement: allow to customize the read projection
---
 .../impl/EventSourcingFilteringManagement.java     | 74 ++++++++++++++++------
 1 file changed, 55 insertions(+), 19 deletions(-)

diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java
index 8dfda3d241..8061428e2c 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java
@@ -19,12 +19,14 @@
 
 package org.apache.james.jmap.api.filtering.impl;
 
+import java.util.Comparator;
 import java.util.List;
 import java.util.Optional;
 
 import javax.inject.Inject;
 
 import org.apache.james.core.Username;
+import org.apache.james.eventsourcing.Event;
 import org.apache.james.eventsourcing.EventSourcingSystem;
 import org.apache.james.eventsourcing.Subscriber;
 import org.apache.james.eventsourcing.eventstore.EventStore;
@@ -42,19 +44,67 @@ import com.google.common.collect.ImmutableSet;
 import reactor.core.publisher.Mono;
 
 public class EventSourcingFilteringManagement implements FilteringManagement {
+    public interface ReadProjection {
+        Publisher<Rules> listRulesForUser(Username username);
+
+        Publisher<Version> getLatestVersion(Username username);
+
+        Optional<Subscriber> subscriber();
+    }
+
+    public static class NoReadProjection implements ReadProjection {
+        private final EventStore eventStore;
+
+        @Inject
+        public NoReadProjection(EventStore eventStore) {
+            this.eventStore = eventStore;
+        }
+
+        @Override
+        public Publisher<Rules> listRulesForUser(Username username) {
+            Preconditions.checkNotNull(username);
+
+            FilteringAggregateId aggregateId = new FilteringAggregateId(username);
+
+            return Mono.from(eventStore.getEventsOfAggregate(aggregateId))
+                .map(history -> FilteringAggregate.load(aggregateId, history).listRules())
+                .defaultIfEmpty(new Rules(ImmutableList.of(), Version.INITIAL));
+        }
+
+        @Override
+        public Publisher<Version> getLatestVersion(Username username) {
+            Preconditions.checkNotNull(username);
+
+            FilteringAggregateId aggregateId = new FilteringAggregateId(username);
+
+            return Mono.from(eventStore.getEventsOfAggregate(aggregateId))
+                .map(History::getVersionAsJava)
+                .map(eventIdOptional -> eventIdOptional.map(eventId -> new Version(eventId.value()))
+                    .orElse(Version.INITIAL));
+        }
+
+        @Override
+        public Optional<Subscriber> subscriber() {
+            return Optional.empty();
+        }
+    }
 
     private static final ImmutableSet<Subscriber> NO_SUBSCRIBER = ImmutableSet.of();
 
-    private final EventStore eventStore;
+    private final ReadProjection readProjection;
     private final EventSourcingSystem eventSourcingSystem;
 
     @Inject
     public EventSourcingFilteringManagement(EventStore eventStore) {
+        this(eventStore, new NoReadProjection(eventStore));
+    }
+
+    public EventSourcingFilteringManagement(EventStore eventStore, ReadProjection readProjection) {
+        this.readProjection = new NoReadProjection(eventStore);
         this.eventSourcingSystem = EventSourcingSystem.fromJava(
             ImmutableSet.of(new DefineRulesCommandHandler(eventStore)),
-            NO_SUBSCRIBER,
+            readProjection.subscriber().map(ImmutableSet::of).orElse(NO_SUBSCRIBER),
             eventStore);
-        this.eventStore = eventStore;
     }
 
     @Override
@@ -68,25 +118,11 @@ public class EventSourcingFilteringManagement implements FilteringManagement {
 
     @Override
     public Publisher<Rules> listRulesForUser(Username username) {
-        Preconditions.checkNotNull(username);
-
-        FilteringAggregateId aggregateId = new FilteringAggregateId(username);
-
-        return Mono.from(eventStore.getEventsOfAggregate(aggregateId))
-            .map(history -> FilteringAggregate.load(aggregateId, history).listRules())
-            .defaultIfEmpty(new Rules(ImmutableList.of(), Version.INITIAL));
+        return readProjection.listRulesForUser(username);
     }
 
     @Override
     public Publisher<Version> getLatestVersion(Username username) {
-        Preconditions.checkNotNull(username);
-
-        FilteringAggregateId aggregateId = new FilteringAggregateId(username);
-
-        return Mono.from(eventStore.getEventsOfAggregate(aggregateId))
-            .map(History::getVersionAsJava)
-            .map(eventIdOptional -> eventIdOptional.map(eventId -> new Version(eventId.value()))
-                .orElse(Version.INITIAL));
+        return readProjection.getLatestVersion(username);
     }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org