You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/04/22 15:09:55 UTC

[james-project] 05/05: JAMES-3777 Populate filtering projection upon Incremental change

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit a133d0771ab48f36171294a8640877790fc00969
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Apr 21 14:39:13 2023 +0700

    JAMES-3777 Populate filtering projection upon Incremental change
---
 .../filtering/CassandraFilteringProjection.java    | 59 ++++++++++++++--------
 .../impl/EventSourcingFilteringManagement.java     |  8 +--
 .../api/filtering/impl/FilteringAggregateId.java   |  4 ++
 .../data/jmap/PopulateFilteringProjectionTask.java |  2 +-
 ...pulateFilteringProjectionRequestToTaskTest.java |  2 +-
 5 files changed, 50 insertions(+), 25 deletions(-)

diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/filtering/CassandraFilteringProjection.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/filtering/CassandraFilteringProjection.java
index 2841a99878..486dddf655 100644
--- a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/filtering/CassandraFilteringProjection.java
+++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/filtering/CassandraFilteringProjection.java
@@ -10,17 +10,22 @@ import static org.apache.james.jmap.cassandra.filtering.CassandraFilteringProjec
 
 import java.util.List;
 import java.util.Optional;
+import java.util.function.Function;
 
 import javax.inject.Inject;
 
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
 import org.apache.james.core.Username;
+import org.apache.james.eventsourcing.AggregateId;
 import org.apache.james.eventsourcing.Event;
+import org.apache.james.eventsourcing.EventId;
 import org.apache.james.eventsourcing.ReactiveSubscriber;
+import org.apache.james.jmap.api.filtering.Rule;
 import org.apache.james.jmap.api.filtering.Rules;
 import org.apache.james.jmap.api.filtering.Version;
 import org.apache.james.jmap.api.filtering.impl.EventSourcingFilteringManagement;
 import org.apache.james.jmap.api.filtering.impl.FilteringAggregateId;
+import org.apache.james.jmap.api.filtering.impl.IncrementalRuleChange;
 import org.apache.james.jmap.api.filtering.impl.RuleSetDefined;
 import org.reactivestreams.Publisher;
 
@@ -30,10 +35,11 @@ import com.datastax.oss.driver.api.core.cql.Row;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
 
 import reactor.core.publisher.Mono;
 
-public class CassandraFilteringProjection implements EventSourcingFilteringManagement.ReadProjection, ReactiveSubscriber {
+public class CassandraFilteringProjection implements EventSourcingFilteringManagement.ReadProjection {
     private final CassandraAsyncExecutor executor;
 
     private final PreparedStatement insertStatement;
@@ -82,27 +88,40 @@ public class CassandraFilteringProjection implements EventSourcingFilteringManag
     }
 
     @Override
-    public Publisher<Void> handleReactive(Event event) {
-        if (event instanceof RuleSetDefined) {
-            return persist((RuleSetDefined) event);
-        }
-        throw new RuntimeException("Unsupported event");
-    }
+    public Optional<ReactiveSubscriber> subscriber(Function<Username, Publisher<Rules>> ruleLoader) {
+        return Optional.of(new ReactiveSubscriber() {
+            @Override
+            public Publisher<Void> handleReactive(Event event) {
+                if (event instanceof RuleSetDefined) {
+                    return persist((RuleSetDefined) event);
+                }
+                if (event instanceof IncrementalRuleChange) {
+                    return persist((IncrementalRuleChange) event);
+                }
+                throw new RuntimeException("Unsupported event");
+            }
 
-    @Override
-    public Optional<ReactiveSubscriber> subscriber() {
-        return Optional.of(this);
-    }
+            private Mono<Void> persist(RuleSetDefined ruleSetDefined) {
+                return persistRules(ruleSetDefined.getAggregateId(), ruleSetDefined.eventId(), ruleSetDefined.getRules());
+            }
 
-    private Mono<Void> persist(RuleSetDefined ruleSetDefined) {
-        try {
-            return executor.executeVoid(insertStatement.bind()
-                .setString(AGGREGATE_ID, ruleSetDefined.getAggregateId().asAggregateKey())
-                .setInt(EVENT_ID, ruleSetDefined.eventId().value())
-                .setString(RULES, objectMapper.writeValueAsString(RuleDTO.from(ruleSetDefined.getRules()))));
-        } catch (JsonProcessingException e) {
-            return Mono.error(e);
-        }
+            private Mono<Void> persistRules(AggregateId aggregateId, EventId eventId, ImmutableList<Rule> rules) {
+                try {
+                    return executor.executeVoid(insertStatement.bind()
+                        .setString(AGGREGATE_ID, aggregateId.asAggregateKey())
+                        .setInt(EVENT_ID, eventId.value())
+                        .setString(RULES, objectMapper.writeValueAsString(RuleDTO.from(rules))));
+                } catch (JsonProcessingException e) {
+                    return Mono.error(e);
+                }
+            }
+
+            private Mono<Void> persist(IncrementalRuleChange incrementalRuleChange) {
+                FilteringAggregateId filteringAggregateId = (FilteringAggregateId) incrementalRuleChange.getAggregateId();
+                return Mono.from(ruleLoader.apply(filteringAggregateId.getUsername()))
+                    .flatMap(rules -> persistRules(filteringAggregateId, incrementalRuleChange.eventId(), ImmutableList.copyOf(rules.getRules())));
+            }
+        });
     }
 
     private Version parseVersion(Row row) {
diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java
index 402c532c6c..08ac0d372f 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java
@@ -22,6 +22,7 @@ package org.apache.james.jmap.api.filtering.impl;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Optional;
+import java.util.function.Function;
 
 import javax.inject.Inject;
 
@@ -50,7 +51,7 @@ public class EventSourcingFilteringManagement implements FilteringManagement {
 
         Publisher<Version> getLatestVersion(Username username);
 
-        Optional<ReactiveSubscriber> subscriber();
+        Optional<ReactiveSubscriber> subscriber(Function<Username, Publisher<Rules>> ruleLoader);
     }
 
     public static class NoReadProjection implements ReadProjection {
@@ -85,7 +86,7 @@ public class EventSourcingFilteringManagement implements FilteringManagement {
         }
 
         @Override
-        public Optional<ReactiveSubscriber> subscriber() {
+        public Optional<ReactiveSubscriber> subscriber(Function<Username, Publisher<Rules>> ruleLoader) {
             return Optional.empty();
         }
     }
@@ -104,7 +105,8 @@ public class EventSourcingFilteringManagement implements FilteringManagement {
         this.readProjection = new NoReadProjection(eventStore);
         this.eventSourcingSystem = EventSourcingSystem.fromJava(
             ImmutableSet.of(new DefineRulesCommandHandler(eventStore)),
-            readProjection.subscriber().map(Subscriber.class::cast).map(ImmutableSet::of).orElse(NO_SUBSCRIBER),
+            readProjection.subscriber(aggregateId -> new NoReadProjection(eventStore).listRulesForUser(aggregateId))
+                .map(Subscriber.class::cast).map(ImmutableSet::of).orElse(NO_SUBSCRIBER),
             eventStore);
     }
 
diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/FilteringAggregateId.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/FilteringAggregateId.java
index 3d20912b20..a43c10ecfa 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/FilteringAggregateId.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/FilteringAggregateId.java
@@ -49,6 +49,10 @@ public class FilteringAggregateId implements AggregateId {
         return PREFIX + SEPARATOR + username.asString();
     }
 
+    public Username getUsername() {
+        return username;
+    }
+
     @Override
     public final boolean equals(Object o) {
         if (o instanceof FilteringAggregateId) {
diff --git a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionTask.java b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionTask.java
index fb408500fe..3d0bc6d2c5 100644
--- a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionTask.java
+++ b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionTask.java
@@ -133,7 +133,7 @@ public class PopulateFilteringProjectionTask implements Task {
             .concatMap(user -> Mono.from(noReadProjection.listRulesForUser(user))
                 .flatMap(rules ->
                     rules.getVersion().asEventId()
-                        .flatMap(eventId -> readProjection.subscriber()
+                        .flatMap(eventId -> readProjection.subscriber(any -> Mono.empty())
                             .map(s -> Mono.from(s.handleReactive(asEvent(user, rules, eventId)))))
                         .orElse(Mono.empty()))
                 .thenReturn(Result.COMPLETED)
diff --git a/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionRequestToTaskTest.java b/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionRequestToTaskTest.java
index 58b5fc547d..337d63f965 100644
--- a/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionRequestToTaskTest.java
+++ b/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionRequestToTaskTest.java
@@ -225,7 +225,7 @@ class PopulateFilteringProjectionRequestToTaskTest {
         Mockito.when(noReadProjection.listRulesForUser(any()))
             .thenReturn(Mono.just(new Rules(ImmutableList.of(rule), new Version(4))));
         ReactiveSubscriber subscriber = mock(ReactiveSubscriber.class);
-        Mockito.when(readProjection.subscriber()).thenReturn(Optional.of(subscriber));
+        Mockito.when(readProjection.subscriber(any())).thenReturn(Optional.of(subscriber));
         Mockito.when(subscriber.handleReactive(any())).thenReturn(Mono.empty());
 
         String taskId = with()


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org