You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/04/22 15:09:55 UTC
[james-project] 05/05: JAMES-3777 Populate filtering projection upon Incremental change
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit a133d0771ab48f36171294a8640877790fc00969
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Apr 21 14:39:13 2023 +0700
JAMES-3777 Populate filtering projection upon Incremental change
---
.../filtering/CassandraFilteringProjection.java | 59 ++++++++++++++--------
.../impl/EventSourcingFilteringManagement.java | 8 +--
.../api/filtering/impl/FilteringAggregateId.java | 4 ++
.../data/jmap/PopulateFilteringProjectionTask.java | 2 +-
...pulateFilteringProjectionRequestToTaskTest.java | 2 +-
5 files changed, 50 insertions(+), 25 deletions(-)
diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/filtering/CassandraFilteringProjection.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/filtering/CassandraFilteringProjection.java
index 2841a99878..486dddf655 100644
--- a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/filtering/CassandraFilteringProjection.java
+++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/filtering/CassandraFilteringProjection.java
@@ -10,17 +10,22 @@ import static org.apache.james.jmap.cassandra.filtering.CassandraFilteringProjec
import java.util.List;
import java.util.Optional;
+import java.util.function.Function;
import javax.inject.Inject;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
import org.apache.james.core.Username;
+import org.apache.james.eventsourcing.AggregateId;
import org.apache.james.eventsourcing.Event;
+import org.apache.james.eventsourcing.EventId;
import org.apache.james.eventsourcing.ReactiveSubscriber;
+import org.apache.james.jmap.api.filtering.Rule;
import org.apache.james.jmap.api.filtering.Rules;
import org.apache.james.jmap.api.filtering.Version;
import org.apache.james.jmap.api.filtering.impl.EventSourcingFilteringManagement;
import org.apache.james.jmap.api.filtering.impl.FilteringAggregateId;
+import org.apache.james.jmap.api.filtering.impl.IncrementalRuleChange;
import org.apache.james.jmap.api.filtering.impl.RuleSetDefined;
import org.reactivestreams.Publisher;
@@ -30,10 +35,11 @@ import com.datastax.oss.driver.api.core.cql.Row;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
import reactor.core.publisher.Mono;
-public class CassandraFilteringProjection implements EventSourcingFilteringManagement.ReadProjection, ReactiveSubscriber {
+public class CassandraFilteringProjection implements EventSourcingFilteringManagement.ReadProjection {
private final CassandraAsyncExecutor executor;
private final PreparedStatement insertStatement;
@@ -82,27 +88,40 @@ public class CassandraFilteringProjection implements EventSourcingFilteringManag
}
@Override
- public Publisher<Void> handleReactive(Event event) {
- if (event instanceof RuleSetDefined) {
- return persist((RuleSetDefined) event);
- }
- throw new RuntimeException("Unsupported event");
- }
+ public Optional<ReactiveSubscriber> subscriber(Function<Username, Publisher<Rules>> ruleLoader) {
+ return Optional.of(new ReactiveSubscriber() {
+ @Override
+ public Publisher<Void> handleReactive(Event event) {
+ if (event instanceof RuleSetDefined) {
+ return persist((RuleSetDefined) event);
+ }
+ if (event instanceof IncrementalRuleChange) {
+ return persist((IncrementalRuleChange) event);
+ }
+ throw new RuntimeException("Unsupported event");
+ }
- @Override
- public Optional<ReactiveSubscriber> subscriber() {
- return Optional.of(this);
- }
+ private Mono<Void> persist(RuleSetDefined ruleSetDefined) {
+ return persistRules(ruleSetDefined.getAggregateId(), ruleSetDefined.eventId(), ruleSetDefined.getRules());
+ }
- private Mono<Void> persist(RuleSetDefined ruleSetDefined) {
- try {
- return executor.executeVoid(insertStatement.bind()
- .setString(AGGREGATE_ID, ruleSetDefined.getAggregateId().asAggregateKey())
- .setInt(EVENT_ID, ruleSetDefined.eventId().value())
- .setString(RULES, objectMapper.writeValueAsString(RuleDTO.from(ruleSetDefined.getRules()))));
- } catch (JsonProcessingException e) {
- return Mono.error(e);
- }
+ private Mono<Void> persistRules(AggregateId aggregateId, EventId eventId, ImmutableList<Rule> rules) {
+ try {
+ return executor.executeVoid(insertStatement.bind()
+ .setString(AGGREGATE_ID, aggregateId.asAggregateKey())
+ .setInt(EVENT_ID, eventId.value())
+ .setString(RULES, objectMapper.writeValueAsString(RuleDTO.from(rules))));
+ } catch (JsonProcessingException e) {
+ return Mono.error(e);
+ }
+ }
+
+ private Mono<Void> persist(IncrementalRuleChange incrementalRuleChange) {
+ FilteringAggregateId filteringAggregateId = (FilteringAggregateId) incrementalRuleChange.getAggregateId();
+ return Mono.from(ruleLoader.apply(filteringAggregateId.getUsername()))
+ .flatMap(rules -> persistRules(filteringAggregateId, incrementalRuleChange.eventId(), ImmutableList.copyOf(rules.getRules())));
+ }
+ });
}
private Version parseVersion(Row row) {
diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java
index 402c532c6c..08ac0d372f 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java
@@ -22,6 +22,7 @@ package org.apache.james.jmap.api.filtering.impl;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
+import java.util.function.Function;
import javax.inject.Inject;
@@ -50,7 +51,7 @@ public class EventSourcingFilteringManagement implements FilteringManagement {
Publisher<Version> getLatestVersion(Username username);
- Optional<ReactiveSubscriber> subscriber();
+ Optional<ReactiveSubscriber> subscriber(Function<Username, Publisher<Rules>> ruleLoader);
}
public static class NoReadProjection implements ReadProjection {
@@ -85,7 +86,7 @@ public class EventSourcingFilteringManagement implements FilteringManagement {
}
@Override
- public Optional<ReactiveSubscriber> subscriber() {
+ public Optional<ReactiveSubscriber> subscriber(Function<Username, Publisher<Rules>> ruleLoader) {
return Optional.empty();
}
}
@@ -104,7 +105,8 @@ public class EventSourcingFilteringManagement implements FilteringManagement {
this.readProjection = new NoReadProjection(eventStore);
this.eventSourcingSystem = EventSourcingSystem.fromJava(
ImmutableSet.of(new DefineRulesCommandHandler(eventStore)),
- readProjection.subscriber().map(Subscriber.class::cast).map(ImmutableSet::of).orElse(NO_SUBSCRIBER),
+ readProjection.subscriber(aggregateId -> new NoReadProjection(eventStore).listRulesForUser(aggregateId))
+ .map(Subscriber.class::cast).map(ImmutableSet::of).orElse(NO_SUBSCRIBER),
eventStore);
}
diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/FilteringAggregateId.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/FilteringAggregateId.java
index 3d20912b20..a43c10ecfa 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/FilteringAggregateId.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/FilteringAggregateId.java
@@ -49,6 +49,10 @@ public class FilteringAggregateId implements AggregateId {
return PREFIX + SEPARATOR + username.asString();
}
+ public Username getUsername() {
+ return username;
+ }
+
@Override
public final boolean equals(Object o) {
if (o instanceof FilteringAggregateId) {
diff --git a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionTask.java b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionTask.java
index fb408500fe..3d0bc6d2c5 100644
--- a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionTask.java
+++ b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionTask.java
@@ -133,7 +133,7 @@ public class PopulateFilteringProjectionTask implements Task {
.concatMap(user -> Mono.from(noReadProjection.listRulesForUser(user))
.flatMap(rules ->
rules.getVersion().asEventId()
- .flatMap(eventId -> readProjection.subscriber()
+ .flatMap(eventId -> readProjection.subscriber(any -> Mono.empty())
.map(s -> Mono.from(s.handleReactive(asEvent(user, rules, eventId)))))
.orElse(Mono.empty()))
.thenReturn(Result.COMPLETED)
diff --git a/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionRequestToTaskTest.java b/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionRequestToTaskTest.java
index 58b5fc547d..337d63f965 100644
--- a/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionRequestToTaskTest.java
+++ b/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionRequestToTaskTest.java
@@ -225,7 +225,7 @@ class PopulateFilteringProjectionRequestToTaskTest {
Mockito.when(noReadProjection.listRulesForUser(any()))
.thenReturn(Mono.just(new Rules(ImmutableList.of(rule), new Version(4))));
ReactiveSubscriber subscriber = mock(ReactiveSubscriber.class);
- Mockito.when(readProjection.subscriber()).thenReturn(Optional.of(subscriber));
+ Mockito.when(readProjection.subscriber(any())).thenReturn(Optional.of(subscriber));
Mockito.when(subscriber.handleReactive(any())).thenReturn(Mono.empty());
String taskId = with()
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org