You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/04/22 15:09:50 UTC

[james-project] branch master updated (334db08121 -> a133d0771a)

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


    from 334db08121 JAMES-3900 Snapshots for polled updates (#1533)
     new 199221edfc JAMES-3777 EventSourcingFilteringManagement avoid read after writes
     new 776dd4cb6d JAMES-3777 EventSourcingFilteringManagement: allow to customize the read projection
     new cafc0d9e14 JAMES-3777 Add a Cassandra projection for JMAP filters
     new d79968462c JAMES-3777 Create a webadmin exposed task to populate Cassandra filtering projection
     new a133d0771a JAMES-3777 Populate filtering projection upon Incremental change

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../docs/modules/ROOT/pages/configure/jmap.adoc    |   5 +
 .../docs/modules/ROOT/pages/operate/webadmin.adoc  |  31 +++
 .../james/modules/data/CassandraJmapModule.java    |  35 +++-
 .../james/modules/data/MemoryDataJmapModule.java   |   1 +
 .../server/JmapTaskSerializationModule.java        |  22 ++
 .../james/modules/server/JmapTasksModule.java      |   4 +
 .../filtering/CassandraFilteringProjection.java    | 137 +++++++++++++
 .../CassandraFilteringProjectionModule.java}       |  23 ++-
 ...urcingFilteringManagementNoProjectionTest.java} |  34 +++-
 ...sandraEventSourcingFilteringManagementTest.java |  32 ++-
 .../apache/james/jmap/api/filtering/Version.java   |  16 ++
 .../impl/EventSourcingFilteringManagement.java     |  85 +++++---
 .../api/filtering/impl/FilteringAggregateId.java   |   4 +
 .../api/filtering/FilteringManagementContract.java |  92 +++++----
 ...MemoryEventSourcingFilteringManagementTest.java |  15 +-
 .../WebAdminServerIntegrationImmutableTest.java    |  17 ++
 .../apache/james/webadmin/data/jmap/Constants.java |   1 +
 ... PopulateFilteringProjectionRequestToTask.java} |  14 +-
 .../data/jmap/PopulateFilteringProjectionTask.java | 164 +++++++++++++++
 ...ringProjectionTaskAdditionalInformationDTO.java |  89 ++++++++
 ...tionItemsTaskAdditionalInformationDTOTest.java} |  11 +-
 ...ulateFilteringProjectionRequestToTaskTest.java} | 223 +++++++--------------
 ...eFilteringProjectionTaskSerializationTest.java} |  21 +-
 .../populateFilters.additionalInformation.json     |   6 +
 .../test/resources/json/populateFilters.task.json  |   3 +
 src/site/markdown/server/manage-webadmin.md        |  30 +++
 src/site/xdoc/server/config-jmap.xml               |   6 +
 27 files changed, 862 insertions(+), 259 deletions(-)
 create mode 100644 server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/filtering/CassandraFilteringProjection.java
 copy server/{mailrepository/mailrepository-cassandra/src/main/java/org/apache/james/mailrepository/cassandra/CassandraMailRepositoryUrlModule.java => data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/filtering/CassandraFilteringProjectionModule.java} (69%)
 copy server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/filtering/{CassandraEventSourcingFilteringManagementTest.java => CassandraEventSourcingFilteringManagementNoProjectionTest.java} (54%)
 copy server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/{PopulateEmailQueryViewRequestToTask.java => PopulateFilteringProjectionRequestToTask.java} (66%)
 create mode 100644 server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionTask.java
 create mode 100644 server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionTaskAdditionalInformationDTO.java
 copy server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/{UploadCleanupTaskAdditionalInformationDTOTest.java => PopulateFilteringProjectionItemsTaskAdditionalInformationDTOTest.java} (80%)
 copy server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/{PopulateEmailQueryViewRequestToTaskTest.java => PopulateFilteringProjectionRequestToTaskTest.java} (50%)
 copy server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/{UploadCleanupTaskSerializationTest.java => PopulateFilteringProjectionTaskSerializationTest.java} (63%)
 create mode 100644 server/protocols/webadmin/webadmin-jmap/src/test/resources/json/populateFilters.additionalInformation.json
 create mode 100644 server/protocols/webadmin/webadmin-jmap/src/test/resources/json/populateFilters.task.json


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 05/05: JAMES-3777 Populate filtering projection upon Incremental change

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit a133d0771ab48f36171294a8640877790fc00969
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Apr 21 14:39:13 2023 +0700

    JAMES-3777 Populate filtering projection upon Incremental change
---
 .../filtering/CassandraFilteringProjection.java    | 59 ++++++++++++++--------
 .../impl/EventSourcingFilteringManagement.java     |  8 +--
 .../api/filtering/impl/FilteringAggregateId.java   |  4 ++
 .../data/jmap/PopulateFilteringProjectionTask.java |  2 +-
 ...pulateFilteringProjectionRequestToTaskTest.java |  2 +-
 5 files changed, 50 insertions(+), 25 deletions(-)

diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/filtering/CassandraFilteringProjection.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/filtering/CassandraFilteringProjection.java
index 2841a99878..486dddf655 100644
--- a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/filtering/CassandraFilteringProjection.java
+++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/filtering/CassandraFilteringProjection.java
@@ -10,17 +10,22 @@ import static org.apache.james.jmap.cassandra.filtering.CassandraFilteringProjec
 
 import java.util.List;
 import java.util.Optional;
+import java.util.function.Function;
 
 import javax.inject.Inject;
 
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
 import org.apache.james.core.Username;
+import org.apache.james.eventsourcing.AggregateId;
 import org.apache.james.eventsourcing.Event;
+import org.apache.james.eventsourcing.EventId;
 import org.apache.james.eventsourcing.ReactiveSubscriber;
+import org.apache.james.jmap.api.filtering.Rule;
 import org.apache.james.jmap.api.filtering.Rules;
 import org.apache.james.jmap.api.filtering.Version;
 import org.apache.james.jmap.api.filtering.impl.EventSourcingFilteringManagement;
 import org.apache.james.jmap.api.filtering.impl.FilteringAggregateId;
+import org.apache.james.jmap.api.filtering.impl.IncrementalRuleChange;
 import org.apache.james.jmap.api.filtering.impl.RuleSetDefined;
 import org.reactivestreams.Publisher;
 
@@ -30,10 +35,11 @@ import com.datastax.oss.driver.api.core.cql.Row;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableList;
 
 import reactor.core.publisher.Mono;
 
-public class CassandraFilteringProjection implements EventSourcingFilteringManagement.ReadProjection, ReactiveSubscriber {
+public class CassandraFilteringProjection implements EventSourcingFilteringManagement.ReadProjection {
     private final CassandraAsyncExecutor executor;
 
     private final PreparedStatement insertStatement;
@@ -82,27 +88,40 @@ public class CassandraFilteringProjection implements EventSourcingFilteringManag
     }
 
     @Override
-    public Publisher<Void> handleReactive(Event event) {
-        if (event instanceof RuleSetDefined) {
-            return persist((RuleSetDefined) event);
-        }
-        throw new RuntimeException("Unsupported event");
-    }
+    public Optional<ReactiveSubscriber> subscriber(Function<Username, Publisher<Rules>> ruleLoader) {
+        return Optional.of(new ReactiveSubscriber() {
+            @Override
+            public Publisher<Void> handleReactive(Event event) {
+                if (event instanceof RuleSetDefined) {
+                    return persist((RuleSetDefined) event);
+                }
+                if (event instanceof IncrementalRuleChange) {
+                    return persist((IncrementalRuleChange) event);
+                }
+                throw new RuntimeException("Unsupported event");
+            }
 
-    @Override
-    public Optional<ReactiveSubscriber> subscriber() {
-        return Optional.of(this);
-    }
+            private Mono<Void> persist(RuleSetDefined ruleSetDefined) {
+                return persistRules(ruleSetDefined.getAggregateId(), ruleSetDefined.eventId(), ruleSetDefined.getRules());
+            }
 
-    private Mono<Void> persist(RuleSetDefined ruleSetDefined) {
-        try {
-            return executor.executeVoid(insertStatement.bind()
-                .setString(AGGREGATE_ID, ruleSetDefined.getAggregateId().asAggregateKey())
-                .setInt(EVENT_ID, ruleSetDefined.eventId().value())
-                .setString(RULES, objectMapper.writeValueAsString(RuleDTO.from(ruleSetDefined.getRules()))));
-        } catch (JsonProcessingException e) {
-            return Mono.error(e);
-        }
+            private Mono<Void> persistRules(AggregateId aggregateId, EventId eventId, ImmutableList<Rule> rules) {
+                try {
+                    return executor.executeVoid(insertStatement.bind()
+                        .setString(AGGREGATE_ID, aggregateId.asAggregateKey())
+                        .setInt(EVENT_ID, eventId.value())
+                        .setString(RULES, objectMapper.writeValueAsString(RuleDTO.from(rules))));
+                } catch (JsonProcessingException e) {
+                    return Mono.error(e);
+                }
+            }
+
+            private Mono<Void> persist(IncrementalRuleChange incrementalRuleChange) {
+                FilteringAggregateId filteringAggregateId = (FilteringAggregateId) incrementalRuleChange.getAggregateId();
+                return Mono.from(ruleLoader.apply(filteringAggregateId.getUsername()))
+                    .flatMap(rules -> persistRules(filteringAggregateId, incrementalRuleChange.eventId(), ImmutableList.copyOf(rules.getRules())));
+            }
+        });
     }
 
     private Version parseVersion(Row row) {
diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java
index 402c532c6c..08ac0d372f 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java
@@ -22,6 +22,7 @@ package org.apache.james.jmap.api.filtering.impl;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Optional;
+import java.util.function.Function;
 
 import javax.inject.Inject;
 
@@ -50,7 +51,7 @@ public class EventSourcingFilteringManagement implements FilteringManagement {
 
         Publisher<Version> getLatestVersion(Username username);
 
-        Optional<ReactiveSubscriber> subscriber();
+        Optional<ReactiveSubscriber> subscriber(Function<Username, Publisher<Rules>> ruleLoader);
     }
 
     public static class NoReadProjection implements ReadProjection {
@@ -85,7 +86,7 @@ public class EventSourcingFilteringManagement implements FilteringManagement {
         }
 
         @Override
-        public Optional<ReactiveSubscriber> subscriber() {
+        public Optional<ReactiveSubscriber> subscriber(Function<Username, Publisher<Rules>> ruleLoader) {
             return Optional.empty();
         }
     }
@@ -104,7 +105,8 @@ public class EventSourcingFilteringManagement implements FilteringManagement {
         this.readProjection = new NoReadProjection(eventStore);
         this.eventSourcingSystem = EventSourcingSystem.fromJava(
             ImmutableSet.of(new DefineRulesCommandHandler(eventStore)),
-            readProjection.subscriber().map(Subscriber.class::cast).map(ImmutableSet::of).orElse(NO_SUBSCRIBER),
+            readProjection.subscriber(aggregateId -> new NoReadProjection(eventStore).listRulesForUser(aggregateId))
+                .map(Subscriber.class::cast).map(ImmutableSet::of).orElse(NO_SUBSCRIBER),
             eventStore);
     }
 
diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/FilteringAggregateId.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/FilteringAggregateId.java
index 3d20912b20..a43c10ecfa 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/FilteringAggregateId.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/FilteringAggregateId.java
@@ -49,6 +49,10 @@ public class FilteringAggregateId implements AggregateId {
         return PREFIX + SEPARATOR + username.asString();
     }
 
+    public Username getUsername() {
+        return username;
+    }
+
     @Override
     public final boolean equals(Object o) {
         if (o instanceof FilteringAggregateId) {
diff --git a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionTask.java b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionTask.java
index fb408500fe..3d0bc6d2c5 100644
--- a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionTask.java
+++ b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionTask.java
@@ -133,7 +133,7 @@ public class PopulateFilteringProjectionTask implements Task {
             .concatMap(user -> Mono.from(noReadProjection.listRulesForUser(user))
                 .flatMap(rules ->
                     rules.getVersion().asEventId()
-                        .flatMap(eventId -> readProjection.subscriber()
+                        .flatMap(eventId -> readProjection.subscriber(any -> Mono.empty())
                             .map(s -> Mono.from(s.handleReactive(asEvent(user, rules, eventId)))))
                         .orElse(Mono.empty()))
                 .thenReturn(Result.COMPLETED)
diff --git a/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionRequestToTaskTest.java b/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionRequestToTaskTest.java
index 58b5fc547d..337d63f965 100644
--- a/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionRequestToTaskTest.java
+++ b/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionRequestToTaskTest.java
@@ -225,7 +225,7 @@ class PopulateFilteringProjectionRequestToTaskTest {
         Mockito.when(noReadProjection.listRulesForUser(any()))
             .thenReturn(Mono.just(new Rules(ImmutableList.of(rule), new Version(4))));
         ReactiveSubscriber subscriber = mock(ReactiveSubscriber.class);
-        Mockito.when(readProjection.subscriber()).thenReturn(Optional.of(subscriber));
+        Mockito.when(readProjection.subscriber(any())).thenReturn(Optional.of(subscriber));
         Mockito.when(subscriber.handleReactive(any())).thenReturn(Mono.empty());
 
         String taskId = with()


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 01/05: JAMES-3777 EventSourcingFilteringManagement avoid read after writes

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 199221edfcdd76a4ece98ee6db2b3e23f6dc287b
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Apr 14 13:38:59 2023 +0700

    JAMES-3777 EventSourcingFilteringManagement avoid read after writes
    
    Version can be deduced from the events generated by the writes without conducting
    further reads.
---
 .../main/java/org/apache/james/jmap/api/filtering/Version.java   | 9 +++++++++
 .../api/filtering/impl/EventSourcingFilteringManagement.java     | 8 ++++----
 2 files changed, 13 insertions(+), 4 deletions(-)

diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/Version.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/Version.java
index 780ef3da2b..8e87cd2de6 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/Version.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/Version.java
@@ -20,12 +20,21 @@
 package org.apache.james.jmap.api.filtering;
 
 import java.util.Objects;
+import java.util.Optional;
+
+import org.apache.james.eventsourcing.EventId;
 
 import com.google.common.base.MoreObjects;
 
 public class Version {
     public static final Version INITIAL = new Version(-1);
 
+    public static Version from(Optional<EventId> eventId) {
+        return eventId.map(EventId::value)
+            .map(Version::new)
+            .orElse(Version.INITIAL);
+    }
+
     private final int version;
 
     public Version(int version) {
diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java
index 27cd52c377..8dfda3d241 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java
@@ -60,10 +60,10 @@ public class EventSourcingFilteringManagement implements FilteringManagement {
     @Override
     public Publisher<Version> defineRulesForUser(Username username, List<Rule> rules, Optional<Version> ifInState) {
         return Mono.from(eventSourcingSystem.dispatch(new DefineRulesCommand(username, rules, ifInState)))
-            .then(Mono.from(eventStore.getEventsOfAggregate(new FilteringAggregateId(username)))
-                .map(History::getVersionAsJava)
-                .map(eventIdOptional -> eventIdOptional.map(eventId -> new Version(eventId.value()))
-                    .orElse(Version.INITIAL)));
+            .map(events -> Version.from(events.stream()
+                .map(Event::eventId)
+                .sorted(Comparator.reverseOrder())
+                .findFirst()));
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 04/05: JAMES-3777 Create a webadmin exposed task to populate Cassandra filtering projection

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit d79968462cf17fd096b4eb9302a0d5c042f49e0d
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Apr 14 18:32:10 2023 +0700

    JAMES-3777 Create a webadmin exposed task to populate Cassandra filtering projection
---
 .../docs/modules/ROOT/pages/operate/webadmin.adoc  |  31 +++
 .../james/modules/data/CassandraJmapModule.java    |   1 +
 .../james/modules/data/MemoryDataJmapModule.java   |   1 +
 .../server/JmapTaskSerializationModule.java        |  22 ++
 .../james/modules/server/JmapTasksModule.java      |   4 +
 .../filtering/CassandraFilteringProjection.java    |   3 +-
 .../CassandraFilteringProjectionModule.java        |   1 -
 .../apache/james/jmap/api/filtering/Version.java   |   7 +
 .../impl/EventSourcingFilteringManagement.java     |   7 +-
 .../WebAdminServerIntegrationImmutableTest.java    |  17 ++
 .../apache/james/webadmin/data/jmap/Constants.java |   1 +
 ... PopulateFilteringProjectionRequestToTask.java} |  19 +-
 .../data/jmap/PopulateFilteringProjectionTask.java | 164 ++++++++++++++
 ...ringProjectionTaskAdditionalInformationDTO.java |  89 ++++++++
 ...tionItemsTaskAdditionalInformationDTOTest.java} |  25 ++-
 ...pulateFilteringProjectionRequestToTaskTest.java | 250 +++++++++++++++++++++
 ...teFilteringProjectionTaskSerializationTest.java |  51 +++++
 .../populateFilters.additionalInformation.json     |   6 +
 .../test/resources/json/populateFilters.task.json  |   3 +
 src/site/markdown/server/manage-webadmin.md        |  30 +++
 20 files changed, 717 insertions(+), 15 deletions(-)

diff --git a/server/apps/distributed-app/docs/modules/ROOT/pages/operate/webadmin.adoc b/server/apps/distributed-app/docs/modules/ROOT/pages/operate/webadmin.adoc
index 449216344b..3a62da769d 100644
--- a/server/apps/distributed-app/docs/modules/ROOT/pages/operate/webadmin.adoc
+++ b/server/apps/distributed-app/docs/modules/ROOT/pages/operate/webadmin.adoc
@@ -1044,6 +1044,37 @@ Response codes:
 * 201: Success. Corresponding task id is returned.
 * 400: Error in the request. Details can be found in the reported error.
 
+==== Recomputing Cassandra filtering projection
+
+You can force the reset of the Cassandra filtering projection by calling the following
+endpoint:
+
+....
+curl -XPOST /mailboxes?task=populateFilteringProjection
+....
+
+Will schedule a task.
+
+link:#_endpoints_returning_a_task[More details about endpoints returning
+a task].
+
+The scheduled task will have the following type
+`PopulateFilteringProjectionTask` and the following
+`additionalInformation`:
+
+....
+{
+  "type":"RecomputeAllPreviewsTask",
+  "processedUserCount": 3,
+  "failedUserCount": 2
+}
+....
+
+Response codes:
+
+* 201: Success. Corresponding task id is returned.
+* 400: Error in the request. Details can be found in the reported error.
+
 ==== ReIndexing action
 
 Be also aware of the limits of this API:
diff --git a/server/container/guice/cassandra/src/main/java/org/apache/james/modules/data/CassandraJmapModule.java b/server/container/guice/cassandra/src/main/java/org/apache/james/modules/data/CassandraJmapModule.java
index 96b1fa5aaa..7a3e8a65f4 100644
--- a/server/container/guice/cassandra/src/main/java/org/apache/james/modules/data/CassandraJmapModule.java
+++ b/server/container/guice/cassandra/src/main/java/org/apache/james/modules/data/CassandraJmapModule.java
@@ -82,6 +82,7 @@ public class CassandraJmapModule extends AbstractModule {
         bind(CustomIdentityDAO.class).to(CassandraCustomIdentityDAO.class);
 
         bind(CassandraFilteringProjection.class).in(Scopes.SINGLETON);
+        bind(EventSourcingFilteringManagement.ReadProjection.class).to(CassandraFilteringProjection.class);
 
         bind(CassandraPushSubscriptionRepository.class).in(Scopes.SINGLETON);
         bind(PushSubscriptionRepository.class).to(CassandraPushSubscriptionRepository.class);
diff --git a/server/container/guice/memory/src/main/java/org/apache/james/modules/data/MemoryDataJmapModule.java b/server/container/guice/memory/src/main/java/org/apache/james/modules/data/MemoryDataJmapModule.java
index 55ba982be9..c7974d853a 100644
--- a/server/container/guice/memory/src/main/java/org/apache/james/modules/data/MemoryDataJmapModule.java
+++ b/server/container/guice/memory/src/main/java/org/apache/james/modules/data/MemoryDataJmapModule.java
@@ -58,6 +58,7 @@ public class MemoryDataJmapModule extends AbstractModule {
 
         bind(EventSourcingFilteringManagement.class).in(Scopes.SINGLETON);
         bind(FilteringManagement.class).to(EventSourcingFilteringManagement.class);
+        bind(EventSourcingFilteringManagement.ReadProjection.class).to(EventSourcingFilteringManagement.NoReadProjection.class);
 
         bind(DefaultTextExtractor.class).in(Scopes.SINGLETON);
         bind(TextExtractor.class).to(JsoupTextExtractor.class);
diff --git a/server/container/guice/protocols/webadmin-jmap/src/main/java/org/apache/james/modules/server/JmapTaskSerializationModule.java b/server/container/guice/protocols/webadmin-jmap/src/main/java/org/apache/james/modules/server/JmapTaskSerializationModule.java
index 4684b32943..13a1a189e4 100644
--- a/server/container/guice/protocols/webadmin-jmap/src/main/java/org/apache/james/modules/server/JmapTaskSerializationModule.java
+++ b/server/container/guice/protocols/webadmin-jmap/src/main/java/org/apache/james/modules/server/JmapTaskSerializationModule.java
@@ -18,16 +18,20 @@
  ****************************************************************/
 package org.apache.james.modules.server;
 
+import org.apache.james.jmap.api.filtering.impl.EventSourcingFilteringManagement;
 import org.apache.james.server.task.json.dto.AdditionalInformationDTO;
 import org.apache.james.server.task.json.dto.AdditionalInformationDTOModule;
 import org.apache.james.server.task.json.dto.TaskDTO;
 import org.apache.james.server.task.json.dto.TaskDTOModule;
 import org.apache.james.task.Task;
 import org.apache.james.task.TaskExecutionDetails;
+import org.apache.james.user.api.UsersRepository;
 import org.apache.james.webadmin.data.jmap.EmailQueryViewPopulator;
 import org.apache.james.webadmin.data.jmap.MessageFastViewProjectionCorrector;
 import org.apache.james.webadmin.data.jmap.PopulateEmailQueryViewTask;
 import org.apache.james.webadmin.data.jmap.PopulateEmailQueryViewTaskAdditionalInformationDTO;
+import org.apache.james.webadmin.data.jmap.PopulateFilteringProjectionTask;
+import org.apache.james.webadmin.data.jmap.PopulateFilteringProjectionTaskAdditionalInformationDTO;
 import org.apache.james.webadmin.data.jmap.RecomputeAllFastViewProjectionItemsTask;
 import org.apache.james.webadmin.data.jmap.RecomputeAllFastViewTaskAdditionalInformationDTO;
 import org.apache.james.webadmin.data.jmap.RecomputeUserFastViewProjectionItemsTask;
@@ -49,6 +53,13 @@ public class JmapTaskSerializationModule extends AbstractModule {
         return PopulateEmailQueryViewTask.module(populator);
     }
 
+    @ProvidesIntoSet
+    public TaskDTOModule<? extends Task, ? extends TaskDTO> populateFilteringProjectionTask(EventSourcingFilteringManagement.NoReadProjection noReadProjection,
+                                                                                            EventSourcingFilteringManagement.ReadProjection readProjection,
+                                                                                            UsersRepository usersRepository) {
+        return PopulateFilteringProjectionTask.module(noReadProjection, readProjection, usersRepository);
+    }
+
     @ProvidesIntoSet
     public TaskDTOModule<? extends Task, ? extends TaskDTO> recomputeUserJmapPreviewsTask(MessageFastViewProjectionCorrector corrector) {
         return RecomputeUserFastViewProjectionItemsTask.module(corrector);
@@ -76,6 +87,17 @@ public class JmapTaskSerializationModule extends AbstractModule {
         return PopulateEmailQueryViewTaskAdditionalInformationDTO.module();
     }
 
+    @ProvidesIntoSet
+    public AdditionalInformationDTOModule<? extends TaskExecutionDetails.AdditionalInformation, ? extends  AdditionalInformationDTO> populateFilteringProjectionAdditionalInformation() {
+        return PopulateFilteringProjectionTaskAdditionalInformationDTO.module();
+    }
+
+    @Named(DTOModuleInjections.WEBADMIN_DTO)
+    @ProvidesIntoSet
+    public AdditionalInformationDTOModule<? extends TaskExecutionDetails.AdditionalInformation, ? extends  AdditionalInformationDTO> webAdminPopulateFilteringProjectionAdditionalInformation() {
+        return PopulateFilteringProjectionTaskAdditionalInformationDTO.module();
+    }
+
     @ProvidesIntoSet
     public AdditionalInformationDTOModule<? extends TaskExecutionDetails.AdditionalInformation, ? extends  AdditionalInformationDTO> recomputeUserJmapPreviewsAdditionalInformation() {
         return RecomputeUserFastViewTaskAdditionalInformationDTO.module();
diff --git a/server/container/guice/protocols/webadmin-jmap/src/main/java/org/apache/james/modules/server/JmapTasksModule.java b/server/container/guice/protocols/webadmin-jmap/src/main/java/org/apache/james/modules/server/JmapTasksModule.java
index aba434bb8b..c035925e66 100644
--- a/server/container/guice/protocols/webadmin-jmap/src/main/java/org/apache/james/modules/server/JmapTasksModule.java
+++ b/server/container/guice/protocols/webadmin-jmap/src/main/java/org/apache/james/modules/server/JmapTasksModule.java
@@ -20,6 +20,7 @@
 package org.apache.james.modules.server;
 
 import org.apache.james.webadmin.data.jmap.PopulateEmailQueryViewRequestToTask;
+import org.apache.james.webadmin.data.jmap.PopulateFilteringProjectionRequestToTask;
 import org.apache.james.webadmin.data.jmap.RecomputeAllFastViewProjectionItemsRequestToTask;
 import org.apache.james.webadmin.data.jmap.RecomputeUserFastViewProjectionItemsRequestToTask;
 import org.apache.james.webadmin.routes.MailboxesRoutes;
@@ -41,6 +42,9 @@ public class JmapTasksModule extends AbstractModule {
         Multibinder.newSetBinder(binder(), TaskFromRequestRegistry.TaskRegistration.class, Names.named(MailboxesRoutes.ALL_MAILBOXES_TASKS))
             .addBinding().to(PopulateEmailQueryViewRequestToTask.class);
 
+        Multibinder.newSetBinder(binder(), TaskFromRequestRegistry.TaskRegistration.class, Names.named(MailboxesRoutes.ALL_MAILBOXES_TASKS))
+            .addBinding().to(PopulateFilteringProjectionRequestToTask.class);
+
         Multibinder.newSetBinder(binder(), TaskFromRequestRegistry.TaskRegistration.class, Names.named(UserMailboxesRoutes.USER_MAILBOXES_OPERATIONS_INJECTION_KEY))
             .addBinding().to(RecomputeUserFastViewProjectionItemsRequestToTask.class);
 
diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/filtering/CassandraFilteringProjection.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/filtering/CassandraFilteringProjection.java
index 0f0b74d98c..2841a99878 100644
--- a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/filtering/CassandraFilteringProjection.java
+++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/filtering/CassandraFilteringProjection.java
@@ -17,7 +17,6 @@ import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
 import org.apache.james.core.Username;
 import org.apache.james.eventsourcing.Event;
 import org.apache.james.eventsourcing.ReactiveSubscriber;
-import org.apache.james.eventsourcing.Subscriber;
 import org.apache.james.jmap.api.filtering.Rules;
 import org.apache.james.jmap.api.filtering.Version;
 import org.apache.james.jmap.api.filtering.impl.EventSourcingFilteringManagement;
@@ -91,7 +90,7 @@ public class CassandraFilteringProjection implements EventSourcingFilteringManag
     }
 
     @Override
-    public Optional<Subscriber> subscriber() {
+    public Optional<ReactiveSubscriber> subscriber() {
         return Optional.of(this);
     }
 
diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/filtering/CassandraFilteringProjectionModule.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/filtering/CassandraFilteringProjectionModule.java
index 5387e795a4..0c1b50d8eb 100644
--- a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/filtering/CassandraFilteringProjectionModule.java
+++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/filtering/CassandraFilteringProjectionModule.java
@@ -21,7 +21,6 @@ package org.apache.james.jmap.cassandra.filtering;
 
 import static com.datastax.oss.driver.api.core.type.DataTypes.INT;
 import static com.datastax.oss.driver.api.core.type.DataTypes.TEXT;
-import static com.datastax.oss.driver.api.core.type.DataTypes.frozenListOf;
 
 import org.apache.james.backends.cassandra.components.CassandraModule;
 
diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/Version.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/Version.java
index 8e87cd2de6..5007ce7007 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/Version.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/Version.java
@@ -69,4 +69,11 @@ public class Version {
     public int asInteger() {
         return version;
     }
+
+    public Optional<EventId> asEventId() {
+        if (version == -1) {
+            return Optional.empty();
+        }
+        return Optional.of(EventId.apply(version));
+    }
 }
diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java
index 8061428e2c..402c532c6c 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java
@@ -28,6 +28,7 @@ import javax.inject.Inject;
 import org.apache.james.core.Username;
 import org.apache.james.eventsourcing.Event;
 import org.apache.james.eventsourcing.EventSourcingSystem;
+import org.apache.james.eventsourcing.ReactiveSubscriber;
 import org.apache.james.eventsourcing.Subscriber;
 import org.apache.james.eventsourcing.eventstore.EventStore;
 import org.apache.james.eventsourcing.eventstore.History;
@@ -49,7 +50,7 @@ public class EventSourcingFilteringManagement implements FilteringManagement {
 
         Publisher<Version> getLatestVersion(Username username);
 
-        Optional<Subscriber> subscriber();
+        Optional<ReactiveSubscriber> subscriber();
     }
 
     public static class NoReadProjection implements ReadProjection {
@@ -84,7 +85,7 @@ public class EventSourcingFilteringManagement implements FilteringManagement {
         }
 
         @Override
-        public Optional<Subscriber> subscriber() {
+        public Optional<ReactiveSubscriber> subscriber() {
             return Optional.empty();
         }
     }
@@ -103,7 +104,7 @@ public class EventSourcingFilteringManagement implements FilteringManagement {
         this.readProjection = new NoReadProjection(eventStore);
         this.eventSourcingSystem = EventSourcingSystem.fromJava(
             ImmutableSet.of(new DefineRulesCommandHandler(eventStore)),
-            readProjection.subscriber().map(ImmutableSet::of).orElse(NO_SUBSCRIBER),
+            readProjection.subscriber().map(Subscriber.class::cast).map(ImmutableSet::of).orElse(NO_SUBSCRIBER),
             eventStore);
     }
 
diff --git a/server/protocols/webadmin-integration-test/webadmin-integration-test-common/src/main/java/org/apache/james/webadmin/integration/WebAdminServerIntegrationImmutableTest.java b/server/protocols/webadmin-integration-test/webadmin-integration-test-common/src/main/java/org/apache/james/webadmin/integration/WebAdminServerIntegrationImmutableTest.java
index 2c1e86c178..3eb1b128d6 100644
--- a/server/protocols/webadmin-integration-test/webadmin-integration-test-common/src/main/java/org/apache/james/webadmin/integration/WebAdminServerIntegrationImmutableTest.java
+++ b/server/protocols/webadmin-integration-test/webadmin-integration-test-common/src/main/java/org/apache/james/webadmin/integration/WebAdminServerIntegrationImmutableTest.java
@@ -142,4 +142,21 @@ public abstract class WebAdminServerIntegrationImmutableTest {
             .body("status", is("completed"))
             .body("type", is("RecomputeAllFastViewProjectionItemsTask"));
     }
+
+    @Test
+    void jmapFilteringProjectionTasksShouldBeExposed() {
+        String taskId = with()
+            .queryParam("task", "populateFilteringProjection")
+            .post("/mailboxes")
+            .jsonPath()
+            .get("taskId");
+
+        given()
+            .basePath(TasksRoutes.BASE)
+        .when()
+            .get(taskId + "/await")
+        .then()
+            .body("status", is("completed"))
+            .body("type", is("PopulateFilteringProjectionTask"));
+    }
 }
\ No newline at end of file
diff --git a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/Constants.java b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/Constants.java
index dfc0ad0f99..1a97bb9b68 100644
--- a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/Constants.java
+++ b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/Constants.java
@@ -24,4 +24,5 @@ import org.apache.james.webadmin.tasks.TaskRegistrationKey;
 public interface Constants {
     TaskRegistrationKey TASK_REGISTRATION_KEY = TaskRegistrationKey.of("recomputeFastViewProjectionItems");
     TaskRegistrationKey POPULATE_EMAIL_QUERY_VIEW = TaskRegistrationKey.of("populateEmailQueryView");
+    TaskRegistrationKey POPULATE_FILTERING_PROJECTION = TaskRegistrationKey.of("populateFilteringProjection");
 }
diff --git a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/Constants.java b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionRequestToTask.java
similarity index 57%
copy from server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/Constants.java
copy to server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionRequestToTask.java
index dfc0ad0f99..7df285aaab 100644
--- a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/Constants.java
+++ b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionRequestToTask.java
@@ -19,9 +19,20 @@
 
 package org.apache.james.webadmin.data.jmap;
 
-import org.apache.james.webadmin.tasks.TaskRegistrationKey;
+import static org.apache.james.webadmin.data.jmap.Constants.POPULATE_FILTERING_PROJECTION;
 
-public interface Constants {
-    TaskRegistrationKey TASK_REGISTRATION_KEY = TaskRegistrationKey.of("recomputeFastViewProjectionItems");
-    TaskRegistrationKey POPULATE_EMAIL_QUERY_VIEW = TaskRegistrationKey.of("populateEmailQueryView");
+import javax.inject.Inject;
+
+import org.apache.james.jmap.api.filtering.impl.EventSourcingFilteringManagement;
+import org.apache.james.user.api.UsersRepository;
+import org.apache.james.webadmin.tasks.TaskFromRequestRegistry;
+
+public class PopulateFilteringProjectionRequestToTask extends TaskFromRequestRegistry.TaskRegistration {
+    @Inject
+    PopulateFilteringProjectionRequestToTask(EventSourcingFilteringManagement.NoReadProjection noReadProjection,
+                                             EventSourcingFilteringManagement.ReadProjection readProjection,
+                                             UsersRepository usersRepository) {
+        super(POPULATE_FILTERING_PROJECTION,
+            request -> new PopulateFilteringProjectionTask(noReadProjection, readProjection, usersRepository));
+    }
 }
diff --git a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionTask.java b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionTask.java
new file mode 100644
index 0000000000..fb408500fe
--- /dev/null
+++ b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionTask.java
@@ -0,0 +1,164 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.webadmin.data.jmap;
+
+import java.time.Clock;
+import java.time.Instant;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.james.core.Username;
+import org.apache.james.eventsourcing.EventId;
+import org.apache.james.jmap.api.filtering.Rules;
+import org.apache.james.jmap.api.filtering.impl.EventSourcingFilteringManagement;
+import org.apache.james.jmap.api.filtering.impl.FilteringAggregateId;
+import org.apache.james.jmap.api.filtering.impl.RuleSetDefined;
+import org.apache.james.json.DTOModule;
+import org.apache.james.server.task.json.dto.TaskDTO;
+import org.apache.james.server.task.json.dto.TaskDTOModule;
+import org.apache.james.task.Task;
+import org.apache.james.task.TaskExecutionDetails;
+import org.apache.james.task.TaskType;
+import org.apache.james.user.api.UsersRepository;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class PopulateFilteringProjectionTask implements Task {
+    static final TaskType TASK_TYPE = TaskType.of("PopulateFilteringProjectionTask");
+
+    public static class AdditionalInformation implements TaskExecutionDetails.AdditionalInformation {
+        private static AdditionalInformation from(AtomicLong processedUserCount, AtomicLong failedUserCount) {
+            return new AdditionalInformation(processedUserCount.get(),
+                failedUserCount.get(),
+                Clock.systemUTC().instant());
+        }
+
+        private final long processedUserCount;
+        private final long failedUserCount;
+        private final Instant timestamp;
+
+        public AdditionalInformation(long processedUserCount, long failedUserCount, Instant timestamp) {
+            this.processedUserCount = processedUserCount;
+            this.failedUserCount = failedUserCount;
+            this.timestamp = timestamp;
+        }
+
+        public long getProcessedUserCount() {
+            return processedUserCount;
+        }
+
+        public long getFailedUserCount() {
+            return failedUserCount;
+        }
+
+        @Override
+        public Instant timestamp() {
+            return timestamp;
+        }
+    }
+
+    public static class PopulateFilteringProjectionTaskDTO implements TaskDTO {
+        private final String type;
+
+        public PopulateFilteringProjectionTaskDTO(@JsonProperty("type") String type) {
+            this.type = type;
+        }
+
+        @Override
+        public String getType() {
+            return type;
+        }
+    }
+
+    public static TaskDTOModule<PopulateFilteringProjectionTask, PopulateFilteringProjectionTaskDTO> module(EventSourcingFilteringManagement.NoReadProjection noReadProjection,
+                                                                                                            EventSourcingFilteringManagement.ReadProjection readProjection,
+                                                                                                            UsersRepository usersRepository) {
+        return DTOModule
+            .forDomainObject(PopulateFilteringProjectionTask.class)
+            .convertToDTO(PopulateFilteringProjectionTaskDTO.class)
+            .toDomainObjectConverter(dto -> asTask(noReadProjection, readProjection, usersRepository))
+            .toDTOConverter(PopulateFilteringProjectionTask::asDTO)
+            .typeName(TASK_TYPE.asString())
+            .withFactory(TaskDTOModule::new);
+    }
+
+    private static PopulateFilteringProjectionTaskDTO asDTO(PopulateFilteringProjectionTask task, String type) {
+        return new PopulateFilteringProjectionTaskDTO(type);
+    }
+
+    private static PopulateFilteringProjectionTask asTask(EventSourcingFilteringManagement.NoReadProjection noReadProjection,
+                                                          EventSourcingFilteringManagement.ReadProjection readProjection,
+                                                          UsersRepository usersRepository) {
+        return new PopulateFilteringProjectionTask(noReadProjection, readProjection, usersRepository);
+    }
+
+    private final EventSourcingFilteringManagement.NoReadProjection noReadProjection;
+    private final EventSourcingFilteringManagement.ReadProjection readProjection;
+    private final UsersRepository usersRepository;
+    private final AtomicLong processedUserCount = new AtomicLong(0L);
+    private final AtomicLong failedUserCount = new AtomicLong(0L);
+
+    public PopulateFilteringProjectionTask(EventSourcingFilteringManagement.NoReadProjection noReadProjection,
+                                           EventSourcingFilteringManagement.ReadProjection readProjection,
+                                           UsersRepository usersRepository) {
+        this.noReadProjection = noReadProjection;
+        this.readProjection = readProjection;
+        this.usersRepository = usersRepository;
+    }
+
+    @Override
+    public Result run() {
+        return Flux.from(usersRepository.listReactive())
+            .concatMap(user -> Mono.from(noReadProjection.listRulesForUser(user))
+                .flatMap(rules ->
+                    rules.getVersion().asEventId()
+                        .flatMap(eventId -> readProjection.subscriber()
+                            .map(s -> Mono.from(s.handleReactive(asEvent(user, rules, eventId)))))
+                        .orElse(Mono.empty()))
+                .thenReturn(Result.COMPLETED)
+                .doOnNext(next -> processedUserCount.incrementAndGet())
+                .onErrorResume(e -> {
+                    LOGGER.error("Failed populating Cassandra filter read projection for {}", user);
+                    failedUserCount.incrementAndGet();
+                    return Mono.just(Result.PARTIAL);
+                }))
+            .reduce(Task::combine)
+            .switchIfEmpty(Mono.just(Result.COMPLETED))
+            .block();
+    }
+
+    private RuleSetDefined asEvent(Username user, Rules rules, EventId eventId) {
+        return new RuleSetDefined(new FilteringAggregateId(user), eventId, ImmutableList.copyOf(rules.getRules()));
+    }
+
+    @Override
+    public TaskType type() {
+        return TASK_TYPE;
+    }
+
+    @Override
+    public Optional<TaskExecutionDetails.AdditionalInformation> details() {
+        return Optional.of(AdditionalInformation.from(processedUserCount, failedUserCount));
+    }
+}
\ No newline at end of file
diff --git a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionTaskAdditionalInformationDTO.java b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionTaskAdditionalInformationDTO.java
new file mode 100644
index 0000000000..1b529c2914
--- /dev/null
+++ b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionTaskAdditionalInformationDTO.java
@@ -0,0 +1,89 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.webadmin.data.jmap;
+
+import java.time.Instant;
+
+import org.apache.james.json.DTOModule;
+import org.apache.james.server.task.json.dto.AdditionalInformationDTO;
+import org.apache.james.server.task.json.dto.AdditionalInformationDTOModule;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.annotations.VisibleForTesting;
+
+public class PopulateFilteringProjectionTaskAdditionalInformationDTO implements AdditionalInformationDTO {
+    public static AdditionalInformationDTOModule<PopulateFilteringProjectionTask.AdditionalInformation, PopulateFilteringProjectionTaskAdditionalInformationDTO> module() {
+        return DTOModule.forDomainObject(PopulateFilteringProjectionTask.AdditionalInformation.class)
+            .convertToDTO(PopulateFilteringProjectionTaskAdditionalInformationDTO.class)
+            .toDomainObjectConverter(PopulateFilteringProjectionTaskAdditionalInformationDTO::toDomainObject)
+            .toDTOConverter(PopulateFilteringProjectionTaskAdditionalInformationDTO::toDTO)
+            .typeName(PopulateFilteringProjectionTask.TASK_TYPE.asString())
+            .withFactory(AdditionalInformationDTOModule::new);
+    }
+
+    private static PopulateFilteringProjectionTask.AdditionalInformation toDomainObject(PopulateFilteringProjectionTaskAdditionalInformationDTO dto) {
+        return new PopulateFilteringProjectionTask.AdditionalInformation(
+            dto.getProcessedUserCount(),
+            dto.getFailedUserCount(),
+            dto.timestamp);
+    }
+
+    private static PopulateFilteringProjectionTaskAdditionalInformationDTO toDTO(PopulateFilteringProjectionTask.AdditionalInformation details, String type) {
+        return new PopulateFilteringProjectionTaskAdditionalInformationDTO(
+            type,
+            details.timestamp(),
+            details.getProcessedUserCount(),
+            details.getFailedUserCount());
+    }
+
+    private final String type;
+    private final Instant timestamp;
+    private final long processedUserCount;
+    private final long failedUserCount;
+
+    @VisibleForTesting
+    PopulateFilteringProjectionTaskAdditionalInformationDTO(@JsonProperty("type") String type,
+                                                            @JsonProperty("timestamp") Instant timestamp,
+                                                            @JsonProperty("processedUserCount") long processedUserCount,
+                                                            @JsonProperty("failedUserCount") long failedUserCount) {
+        this.type = type;
+        this.timestamp = timestamp;
+        this.processedUserCount = processedUserCount;
+        this.failedUserCount = failedUserCount;
+    }
+
+    @Override
+    public String getType() {
+        return type;
+    }
+
+    @Override
+    public Instant getTimestamp() {
+        return timestamp;
+    }
+
+    public long getProcessedUserCount() {
+        return processedUserCount;
+    }
+
+    public long getFailedUserCount() {
+        return failedUserCount;
+    }
+}
diff --git a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/Constants.java b/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionItemsTaskAdditionalInformationDTOTest.java
similarity index 58%
copy from server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/Constants.java
copy to server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionItemsTaskAdditionalInformationDTOTest.java
index dfc0ad0f99..5a5accfd1c 100644
--- a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/Constants.java
+++ b/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionItemsTaskAdditionalInformationDTOTest.java
@@ -19,9 +19,24 @@
 
 package org.apache.james.webadmin.data.jmap;
 
-import org.apache.james.webadmin.tasks.TaskRegistrationKey;
+import java.time.Instant;
 
-public interface Constants {
-    TaskRegistrationKey TASK_REGISTRATION_KEY = TaskRegistrationKey.of("recomputeFastViewProjectionItems");
-    TaskRegistrationKey POPULATE_EMAIL_QUERY_VIEW = TaskRegistrationKey.of("populateEmailQueryView");
-}
+import org.apache.james.JsonSerializationVerifier;
+import org.apache.james.util.ClassLoaderUtils;
+import org.junit.jupiter.api.Test;
+
+class PopulateFilteringProjectionItemsTaskAdditionalInformationDTOTest {
+    private static final Instant INSTANT = Instant.parse("2007-12-03T10:15:30.00Z");
+    private static final PopulateFilteringProjectionTask.AdditionalInformation DOMAIN_OBJECT = new PopulateFilteringProjectionTask.AdditionalInformation(
+        1,
+        2,
+        INSTANT);
+
+    @Test
+    void shouldMatchJsonSerializationContract() throws Exception {
+        JsonSerializationVerifier.dtoModule(PopulateFilteringProjectionTaskAdditionalInformationDTO.module())
+            .bean(DOMAIN_OBJECT)
+            .json(ClassLoaderUtils.getSystemResourceAsString("json/populateFilters.additionalInformation.json"))
+            .verify();
+    }
+}
\ No newline at end of file
diff --git a/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionRequestToTaskTest.java b/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionRequestToTaskTest.java
new file mode 100644
index 0000000000..58b5fc547d
--- /dev/null
+++ b/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionRequestToTaskTest.java
@@ -0,0 +1,250 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.webadmin.data.jmap;
+
+import static io.restassured.RestAssured.given;
+import static io.restassured.RestAssured.when;
+import static io.restassured.RestAssured.with;
+import static javax.mail.Flags.Flag.DELETED;
+import static org.apache.james.jmap.api.filtering.Rule.Condition.Comparator.CONTAINS;
+import static org.apache.james.jmap.api.filtering.Rule.Condition.Field.SUBJECT;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.Optional;
+
+import javax.mail.Flags;
+
+import org.apache.james.core.Username;
+import org.apache.james.domainlist.api.DomainList;
+import org.apache.james.eventsourcing.Event;
+import org.apache.james.eventsourcing.EventId;
+import org.apache.james.eventsourcing.ReactiveSubscriber;
+import org.apache.james.jmap.api.filtering.Rule;
+import org.apache.james.jmap.api.filtering.Rules;
+import org.apache.james.jmap.api.filtering.Version;
+import org.apache.james.jmap.api.filtering.impl.EventSourcingFilteringManagement;
+import org.apache.james.jmap.api.filtering.impl.FilteringAggregateId;
+import org.apache.james.jmap.api.filtering.impl.RuleSetDefined;
+import org.apache.james.jmap.memory.projections.MemoryEmailQueryView;
+import org.apache.james.json.DTOConverter;
+import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.MessageManager;
+import org.apache.james.mailbox.extension.PreDeletionHook;
+import org.apache.james.mailbox.inmemory.InMemoryMailboxManager;
+import org.apache.james.mailbox.inmemory.manager.InMemoryIntegrationResources;
+import org.apache.james.mailbox.model.ComposedMessageId;
+import org.apache.james.mailbox.model.MailboxId;
+import org.apache.james.mailbox.model.MailboxPath;
+import org.apache.james.task.Hostname;
+import org.apache.james.task.MemoryTaskManager;
+import org.apache.james.task.TaskManager;
+import org.apache.james.user.api.UsersRepository;
+import org.apache.james.user.memory.MemoryUsersRepository;
+import org.apache.james.util.streams.Limit;
+import org.apache.james.webadmin.Routes;
+import org.apache.james.webadmin.WebAdminServer;
+import org.apache.james.webadmin.WebAdminUtils;
+import org.apache.james.webadmin.routes.TasksRoutes;
+import org.apache.james.webadmin.tasks.TaskFromRequestRegistry;
+import org.apache.james.webadmin.utils.ErrorResponder;
+import org.apache.james.webadmin.utils.JsonTransformer;
+import org.eclipse.jetty.http.HttpStatus;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import com.google.common.collect.ImmutableList;
+
+import io.restassured.RestAssured;
+import reactor.core.publisher.Mono;
+import spark.Service;
+
+class PopulateFilteringProjectionRequestToTaskTest {
+    private static final String UNSCRAMBLED_SUBJECT = "this is the subject Frédéric MARTIN of the mail";
+
+    private EventSourcingFilteringManagement.NoReadProjection noReadProjection;
+    private EventSourcingFilteringManagement.ReadProjection readProjection;
+
+    private static final class JMAPRoutes implements Routes {
+        private final TaskManager taskManager;
+        private final EventSourcingFilteringManagement.NoReadProjection noReadProjection;
+        private final EventSourcingFilteringManagement.ReadProjection readProjection;
+        private final UsersRepository usersRepository;
+
+        private JMAPRoutes(EventSourcingFilteringManagement.NoReadProjection noReadProjection,
+                           EventSourcingFilteringManagement.ReadProjection readProjection,
+                           UsersRepository usersRepository,
+                           TaskManager taskManager) {
+            this.noReadProjection = noReadProjection;
+            this.readProjection = readProjection;
+            this.usersRepository = usersRepository;
+            this.taskManager = taskManager;
+        }
+
+        @Override
+        public String getBasePath() {
+            return BASE_PATH;
+        }
+
+        @Override
+        public void define(Service service) {
+            service.post(BASE_PATH,
+                TaskFromRequestRegistry.builder()
+                    .registrations(new PopulateFilteringProjectionRequestToTask(noReadProjection, readProjection, usersRepository))
+                    .buildAsRoute(taskManager),
+                new JsonTransformer());
+        }
+    }
+
+    static final String BASE_PATH = "/mailboxes";
+
+    static final DomainList NO_DOMAIN_LIST = null;
+    static final Username BOB = Username.of("bob");
+
+    private WebAdminServer webAdminServer;
+    private MemoryTaskManager taskManager;
+    private MailboxId bobInboxboxId;
+
+    @BeforeEach
+    void setUp() throws Exception {
+        JsonTransformer jsonTransformer = new JsonTransformer();
+        taskManager = new MemoryTaskManager(new Hostname("foo"));
+
+        InMemoryMailboxManager mailboxManager = InMemoryIntegrationResources.defaultResources().getMailboxManager();
+        MemoryUsersRepository usersRepository = MemoryUsersRepository.withoutVirtualHosting(NO_DOMAIN_LIST);
+        usersRepository.addUser(BOB, "pass");
+        MailboxSession bobSession = mailboxManager.createSystemSession(BOB);
+        bobInboxboxId = mailboxManager.createMailbox(MailboxPath.inbox(BOB), bobSession)
+            .get();
+
+        noReadProjection = mock(EventSourcingFilteringManagement.NoReadProjection.class);
+        readProjection = mock(EventSourcingFilteringManagement.ReadProjection.class);
+        webAdminServer = WebAdminUtils.createWebAdminServer(
+            new TasksRoutes(taskManager, jsonTransformer,
+                DTOConverter.of(PopulateFilteringProjectionTaskAdditionalInformationDTO.module())),
+            new JMAPRoutes(
+                noReadProjection,
+                readProjection,
+                usersRepository,
+                taskManager))
+            .start();
+
+        RestAssured.requestSpecification = WebAdminUtils.buildRequestSpecification(webAdminServer)
+            .setBasePath("/mailboxes")
+            .build();
+    }
+
+    @AfterEach
+    void afterEach() {
+        webAdminServer.destroy();
+        taskManager.stop();
+    }
+
+    @Test
+    void actionRequestParameterShouldBeCompulsory() {
+        when()
+            .post()
+        .then()
+            .statusCode(HttpStatus.BAD_REQUEST_400)
+            .body("statusCode", is(400))
+            .body("type", is(ErrorResponder.ErrorType.INVALID_ARGUMENT.getType()))
+            .body("message", is("Invalid arguments supplied in the user request"))
+            .body("details", is("'action' query parameter is compulsory. Supported values are [populateFilteringProjection]"));
+    }
+
+    @Test
+    void postShouldFailUponEmptyAction() {
+        given()
+            .queryParam("action", "")
+            .post()
+        .then()
+            .statusCode(HttpStatus.BAD_REQUEST_400)
+            .body("statusCode", is(400))
+            .body("type", is(ErrorResponder.ErrorType.INVALID_ARGUMENT.getType()))
+            .body("message", is("Invalid arguments supplied in the user request"))
+            .body("details", is("'action' query parameter cannot be empty or blank. Supported values are [populateFilteringProjection]"));
+    }
+
+    @Test
+    void postShouldFailUponInvalidAction() {
+        given()
+            .queryParam("action", "invalid")
+            .post()
+        .then()
+            .statusCode(HttpStatus.BAD_REQUEST_400)
+            .body("statusCode", is(400))
+            .body("type", is(ErrorResponder.ErrorType.INVALID_ARGUMENT.getType()))
+            .body("message", is("Invalid arguments supplied in the user request"))
+            .body("details", is("Invalid value supplied for query parameter 'action': invalid. Supported values are [populateFilteringProjection]"));
+    }
+
+    @Test
+    void postShouldCreateANewTask() {
+        given()
+            .queryParam("action", "populateFilteringProjection")
+            .post()
+        .then()
+            .statusCode(HttpStatus.CREATED_201)
+            .body("taskId", notNullValue());
+    }
+
+    @Test
+    void populateShouldUpdateProjection() {
+        Rule rule = Rule.builder()
+            .id(Rule.Id.of("2"))
+            .name("rule 2")
+            .condition(Rule.Condition.of(SUBJECT, CONTAINS, UNSCRAMBLED_SUBJECT))
+            .action(Rule.Action.of(Rule.Action.AppendInMailboxes.withMailboxIds(ImmutableList.of(bobInboxboxId.serialize()))))
+            .build();
+
+        Mockito.when(noReadProjection.listRulesForUser(any()))
+            .thenReturn(Mono.just(new Rules(ImmutableList.of(rule), new Version(4))));
+        ReactiveSubscriber subscriber = mock(ReactiveSubscriber.class);
+        Mockito.when(readProjection.subscriber()).thenReturn(Optional.of(subscriber));
+        Mockito.when(subscriber.handleReactive(any())).thenReturn(Mono.empty());
+
+        String taskId = with()
+            .queryParam("action", "populateFilteringProjection")
+            .post()
+            .jsonPath()
+            .get("taskId");
+
+        with()
+            .basePath(TasksRoutes.BASE)
+            .get(taskId + "/await");
+
+        ArgumentCaptor<Event> captor = ArgumentCaptor.forClass(Event.class);
+        verify(subscriber, times(1)).handleReactive(captor.capture());
+
+        assertThat(captor.getValue().eventId()).isEqualTo(EventId.fromSerialized(4));
+        assertThat(captor.getValue().getAggregateId()).isEqualTo(new FilteringAggregateId(BOB));
+        assertThat(captor.getValue()).isInstanceOf(RuleSetDefined.class);
+        RuleSetDefined ruleSetDefined = (RuleSetDefined) captor.getValue();
+        assertThat(ruleSetDefined.getRules()).containsOnly(rule);
+    }
+}
\ No newline at end of file
diff --git a/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionTaskSerializationTest.java b/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionTaskSerializationTest.java
new file mode 100644
index 0000000000..03c4401d19
--- /dev/null
+++ b/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionTaskSerializationTest.java
@@ -0,0 +1,51 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.webadmin.data.jmap;
+
+import static org.mockito.Mockito.mock;
+
+import org.apache.james.JsonSerializationVerifier;
+import org.apache.james.eventsourcing.eventstore.EventStore;
+import org.apache.james.jmap.api.filtering.impl.EventSourcingFilteringManagement;
+import org.apache.james.user.api.UsersRepository;
+import org.apache.james.util.ClassLoaderUtils;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class PopulateFilteringProjectionTaskSerializationTest {
+    private EventSourcingFilteringManagement.NoReadProjection noReadProjection;
+    private EventSourcingFilteringManagement.ReadProjection readProjection;
+    private UsersRepository usersRepository;
+
+    @BeforeEach
+    void setUp() {
+        noReadProjection = new EventSourcingFilteringManagement.NoReadProjection(mock(EventStore.class));
+        readProjection = mock(EventSourcingFilteringManagement.ReadProjection.class);
+        usersRepository = mock(UsersRepository.class);
+    }
+
+    @Test
+    void shouldMatchJsonSerializationContract() throws Exception {
+        JsonSerializationVerifier.dtoModule(PopulateFilteringProjectionTask.module(noReadProjection, readProjection, usersRepository))
+            .bean(new PopulateFilteringProjectionTask(noReadProjection, readProjection, usersRepository))
+            .json(ClassLoaderUtils.getSystemResourceAsString("json/populateFilters.task.json"))
+            .verify();
+    }
+}
\ No newline at end of file
diff --git a/server/protocols/webadmin/webadmin-jmap/src/test/resources/json/populateFilters.additionalInformation.json b/server/protocols/webadmin/webadmin-jmap/src/test/resources/json/populateFilters.additionalInformation.json
new file mode 100644
index 0000000000..8e2fd25bd7
--- /dev/null
+++ b/server/protocols/webadmin/webadmin-jmap/src/test/resources/json/populateFilters.additionalInformation.json
@@ -0,0 +1,6 @@
+{
+  "type":"PopulateFilteringProjectionTask",
+  "timestamp":"2007-12-03T10:15:30Z",
+  "processedUserCount":1,
+  "failedUserCount":2
+}
\ No newline at end of file
diff --git a/server/protocols/webadmin/webadmin-jmap/src/test/resources/json/populateFilters.task.json b/server/protocols/webadmin/webadmin-jmap/src/test/resources/json/populateFilters.task.json
new file mode 100644
index 0000000000..65b1b1f29d
--- /dev/null
+++ b/server/protocols/webadmin/webadmin-jmap/src/test/resources/json/populateFilters.task.json
@@ -0,0 +1,3 @@
+{
+  "type":"PopulateFilteringProjectionTask"
+}
\ No newline at end of file
diff --git a/src/site/markdown/server/manage-webadmin.md b/src/site/markdown/server/manage-webadmin.md
index f918215dd1..ed18c2c9b2 100644
--- a/src/site/markdown/server/manage-webadmin.md
+++ b/src/site/markdown/server/manage-webadmin.md
@@ -1545,6 +1545,36 @@ Response codes:
  - 201: Success. Corresponding task id is returned.
  - 400: Error in the request. Details can be found in the reported error.
  - 404: User not found.
+ 
+ 
+### Recomputing Cassandra filtering projection
+
+You can force the reset of the Cassandra filtering projection by calling the following
+endpoint:
+
+```
+curl -XPOST /mailboxes?task=populateFilteringProjection
+```
+
+Will schedule a task.
+
+[More details about endpoints returning a task](#Endpoints_returning_a_task).
+
+The scheduled task will have the following type
+`PopulateFilteringProjectionTask` and the following
+`additionalInformation`:
+
+```{
+  "type":"RecomputeAllPreviewsTask",
+  "processedUserCount": 3,
+  "failedUserCount": 2
+}
+```
+
+Response codes:
+
+ - 201: Success. Corresponding task id is returned.
+ - 400: Error in the request. Details can be found in the reported error.
 
 ## Administrating quotas by users
 


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 03/05: JAMES-3777 Add a Cassandra projection for JMAP filters

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit cafc0d9e149be77cf71906b1375ddd0c561535d3
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Apr 14 15:20:00 2023 +0700

    JAMES-3777 Add a Cassandra projection for JMAP filters
    
    Writes is still O(n2) but using the read projection allows
    reads to be O(n). It also enables avoiding LWTs upon reads.
---
 .../docs/modules/ROOT/pages/configure/jmap.adoc    |   5 +
 .../james/modules/data/CassandraJmapModule.java    |  34 +++++-
 .../filtering/CassandraFilteringProjection.java    | 119 +++++++++++++++++++++
 .../CassandraFilteringProjectionModule.java}       |  30 ++++--
 ...urcingFilteringManagementNoProjectionTest.java} |  34 ++++--
 ...sandraEventSourcingFilteringManagementTest.java |  32 +++++-
 .../api/filtering/FilteringManagementContract.java |  92 ++++++++--------
 ...MemoryEventSourcingFilteringManagementTest.java |  15 ++-
 src/site/xdoc/server/config-jmap.xml               |   6 ++
 9 files changed, 294 insertions(+), 73 deletions(-)

diff --git a/server/apps/distributed-app/docs/modules/ROOT/pages/configure/jmap.adoc b/server/apps/distributed-app/docs/modules/ROOT/pages/configure/jmap.adoc
index 27ce9a0a01..eb3e31bb14 100644
--- a/server/apps/distributed-app/docs/modules/ROOT/pages/configure/jmap.adoc
+++ b/server/apps/distributed-app/docs/modules/ROOT/pages/configure/jmap.adoc
@@ -87,6 +87,11 @@ then `capabilities."urn:ietf:params:jmap:websocket".url` in response will be "ws
 | webpush.prevent.server.side.request.forgery
 | Optional boolean. Prevent server side request forgery by preventing calls to the private network ranges. Defaults to true, can be disabled for testing.
 
+| cassandra.filter.projection.activated
+|Optional boolean. Defaults to false. Casandra backends only. Whether to use or not the Cassandra projection
+for JMAP filters. This projection optimizes reads, but needs to be correctly populated. Turning it on on
+systems with filters already defined would result in those filters to be not read.
+
 |===
 
 == Wire tapping
diff --git a/server/container/guice/cassandra/src/main/java/org/apache/james/modules/data/CassandraJmapModule.java b/server/container/guice/cassandra/src/main/java/org/apache/james/modules/data/CassandraJmapModule.java
index 1dd7a16301..96b1fa5aaa 100644
--- a/server/container/guice/cassandra/src/main/java/org/apache/james/modules/data/CassandraJmapModule.java
+++ b/server/container/guice/cassandra/src/main/java/org/apache/james/modules/data/CassandraJmapModule.java
@@ -19,9 +19,13 @@
 
 package org.apache.james.modules.data;
 
+import java.io.FileNotFoundException;
+
+import org.apache.commons.configuration2.ex.ConfigurationException;
 import org.apache.james.backends.cassandra.components.CassandraModule;
 import org.apache.james.core.healthcheck.HealthCheck;
 import org.apache.james.eventsourcing.Event;
+import org.apache.james.eventsourcing.eventstore.EventStore;
 import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTO;
 import org.apache.james.eventsourcing.eventstore.cassandra.dto.EventDTOModule;
 import org.apache.james.jmap.api.access.AccessTokenRepository;
@@ -38,6 +42,8 @@ import org.apache.james.jmap.cassandra.access.CassandraAccessModule;
 import org.apache.james.jmap.cassandra.access.CassandraAccessTokenRepository;
 import org.apache.james.jmap.cassandra.change.CassandraEmailChangeModule;
 import org.apache.james.jmap.cassandra.change.CassandraMailboxChangeModule;
+import org.apache.james.jmap.cassandra.filtering.CassandraFilteringProjection;
+import org.apache.james.jmap.cassandra.filtering.CassandraFilteringProjectionModule;
 import org.apache.james.jmap.cassandra.filtering.FilteringRuleSetDefineDTOModules;
 import org.apache.james.jmap.cassandra.identity.CassandraCustomIdentityDAO;
 import org.apache.james.jmap.cassandra.identity.CassandraCustomIdentityModule;
@@ -52,14 +58,16 @@ import org.apache.james.jmap.cassandra.upload.UploadConfiguration;
 import org.apache.james.jmap.cassandra.upload.UploadDAO;
 import org.apache.james.jmap.cassandra.upload.UploadModule;
 import org.apache.james.user.api.UsernameChangeTaskStep;
+import org.apache.james.utils.PropertiesProvider;
 
 import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
 import com.google.inject.Scopes;
+import com.google.inject.Singleton;
 import com.google.inject.TypeLiteral;
 import com.google.inject.multibindings.Multibinder;
 
 public class CassandraJmapModule extends AbstractModule {
-
     @Override
     protected void configure() {
         bind(CassandraAccessTokenRepository.class).in(Scopes.SINGLETON);
@@ -73,8 +81,7 @@ public class CassandraJmapModule extends AbstractModule {
         bind(CassandraCustomIdentityDAO.class).in(Scopes.SINGLETON);
         bind(CustomIdentityDAO.class).to(CassandraCustomIdentityDAO.class);
 
-        bind(EventSourcingFilteringManagement.class).in(Scopes.SINGLETON);
-        bind(FilteringManagement.class).to(EventSourcingFilteringManagement.class);
+        bind(CassandraFilteringProjection.class).in(Scopes.SINGLETON);
 
         bind(CassandraPushSubscriptionRepository.class).in(Scopes.SINGLETON);
         bind(PushSubscriptionRepository.class).to(CassandraPushSubscriptionRepository.class);
@@ -97,6 +104,7 @@ public class CassandraJmapModule extends AbstractModule {
         cassandraDataDefinitions.addBinding().toInstance(CassandraEmailChangeModule.MODULE);
         cassandraDataDefinitions.addBinding().toInstance(UploadModule.MODULE);
         cassandraDataDefinitions.addBinding().toInstance(CassandraPushSubscriptionModule.MODULE);
+        cassandraDataDefinitions.addBinding().toInstance(CassandraFilteringProjectionModule.MODULE);
         cassandraDataDefinitions.addBinding().toInstance(CassandraCustomIdentityModule.MODULE());
 
         Multibinder<EventDTOModule<? extends Event, ? extends EventDTO>> eventDTOModuleBinder = Multibinder.newSetBinder(binder(), new TypeLiteral<>() {});
@@ -108,4 +116,24 @@ public class CassandraJmapModule extends AbstractModule {
             .addBinding()
             .to(FilterUsernameChangeTaskStep.class);
     }
+
+    @Singleton
+    @Provides
+    FilteringManagement provideFilteringManagement(EventStore eventStore, CassandraFilteringProjection cassandraFilteringProjection,
+                                                   PropertiesProvider propertiesProvider) throws ConfigurationException {
+        if (cassandraFilterProjectionActivated(propertiesProvider)) {
+            return new EventSourcingFilteringManagement(eventStore, cassandraFilteringProjection);
+        } else {
+            return new EventSourcingFilteringManagement(eventStore, new EventSourcingFilteringManagement.NoReadProjection(eventStore));
+        }
+    }
+
+    private boolean cassandraFilterProjectionActivated(PropertiesProvider propertiesProvider) throws ConfigurationException {
+        try {
+            return propertiesProvider.getConfiguration("jmap")
+                .getBoolean("cassandra.filter.projection.activated", false);
+        } catch (FileNotFoundException e) {
+            return false;
+        }
+    }
 }
diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/filtering/CassandraFilteringProjection.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/filtering/CassandraFilteringProjection.java
new file mode 100644
index 0000000000..0f0b74d98c
--- /dev/null
+++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/filtering/CassandraFilteringProjection.java
@@ -0,0 +1,119 @@
+package org.apache.james.jmap.cassandra.filtering;
+
+import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker;
+import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.insertInto;
+import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom;
+import static org.apache.james.jmap.cassandra.filtering.CassandraFilteringProjectionModule.AGGREGATE_ID;
+import static org.apache.james.jmap.cassandra.filtering.CassandraFilteringProjectionModule.EVENT_ID;
+import static org.apache.james.jmap.cassandra.filtering.CassandraFilteringProjectionModule.RULES;
+import static org.apache.james.jmap.cassandra.filtering.CassandraFilteringProjectionModule.TABLE_NAME;
+
+import java.util.List;
+import java.util.Optional;
+
+import javax.inject.Inject;
+
+import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
+import org.apache.james.core.Username;
+import org.apache.james.eventsourcing.Event;
+import org.apache.james.eventsourcing.ReactiveSubscriber;
+import org.apache.james.eventsourcing.Subscriber;
+import org.apache.james.jmap.api.filtering.Rules;
+import org.apache.james.jmap.api.filtering.Version;
+import org.apache.james.jmap.api.filtering.impl.EventSourcingFilteringManagement;
+import org.apache.james.jmap.api.filtering.impl.FilteringAggregateId;
+import org.apache.james.jmap.api.filtering.impl.RuleSetDefined;
+import org.reactivestreams.Publisher;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.cql.PreparedStatement;
+import com.datastax.oss.driver.api.core.cql.Row;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import reactor.core.publisher.Mono;
+
+public class CassandraFilteringProjection implements EventSourcingFilteringManagement.ReadProjection, ReactiveSubscriber {
+    private final CassandraAsyncExecutor executor;
+
+    private final PreparedStatement insertStatement;
+    private final PreparedStatement readStatement;
+    private final PreparedStatement readVersionStatement;
+    private final ObjectMapper objectMapper;
+
+    @Inject
+    public CassandraFilteringProjection(CqlSession session) {
+        executor = new CassandraAsyncExecutor(session);
+
+        insertStatement = session.prepare(insertInto(TABLE_NAME)
+            .value(AGGREGATE_ID, bindMarker(AGGREGATE_ID))
+            .value(EVENT_ID, bindMarker(EVENT_ID))
+            .value(RULES, bindMarker(RULES))
+            .build());
+        readStatement = session.prepare(selectFrom(TABLE_NAME).all()
+            .whereColumn(AGGREGATE_ID).isEqualTo(bindMarker(AGGREGATE_ID))
+            .build());
+        readVersionStatement = session.prepare(selectFrom(TABLE_NAME).column(EVENT_ID)
+            .whereColumn(AGGREGATE_ID).isEqualTo(bindMarker(AGGREGATE_ID))
+            .build());
+
+        objectMapper = new ObjectMapper();
+    }
+
+    @Override
+    public Publisher<Rules> listRulesForUser(Username username) {
+        return executor.executeSingleRow(readStatement.bind()
+            .setString(AGGREGATE_ID, new FilteringAggregateId(username).asAggregateKey()))
+            .handle((row, sink) -> {
+                try {
+                    Rules rules = parseRules(row);
+                    sink.next(rules);
+                } catch (JsonProcessingException e) {
+                    sink.error(e);
+                }
+            });
+    }
+
+    @Override
+    public Publisher<Version> getLatestVersion(Username username) {
+        return executor.executeSingleRow(readVersionStatement.bind()
+            .setString(AGGREGATE_ID, new FilteringAggregateId(username).asAggregateKey()))
+            .map(this::parseVersion);
+    }
+
+    @Override
+    public Publisher<Void> handleReactive(Event event) {
+        if (event instanceof RuleSetDefined) {
+            return persist((RuleSetDefined) event);
+        }
+        throw new RuntimeException("Unsupported event");
+    }
+
+    @Override
+    public Optional<Subscriber> subscriber() {
+        return Optional.of(this);
+    }
+
+    private Mono<Void> persist(RuleSetDefined ruleSetDefined) {
+        try {
+            return executor.executeVoid(insertStatement.bind()
+                .setString(AGGREGATE_ID, ruleSetDefined.getAggregateId().asAggregateKey())
+                .setInt(EVENT_ID, ruleSetDefined.eventId().value())
+                .setString(RULES, objectMapper.writeValueAsString(RuleDTO.from(ruleSetDefined.getRules()))));
+        } catch (JsonProcessingException e) {
+            return Mono.error(e);
+        }
+    }
+
+    private Version parseVersion(Row row) {
+        return new Version(row.getInt(EVENT_ID));
+    }
+
+    private Rules parseRules(Row row) throws JsonProcessingException {
+        String serializedRules = row.getString(RULES);
+        List<RuleDTO> ruleDTOS = objectMapper.readValue(serializedRules, new TypeReference<>() {});
+        Version version = parseVersion(row);
+        return new Rules(RuleDTO.toRules(ruleDTOS), version);
+    }
+}
diff --git a/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/filtering/CassandraEventSourcingFilteringManagementTest.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/filtering/CassandraFilteringProjectionModule.java
similarity index 59%
copy from server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/filtering/CassandraEventSourcingFilteringManagementTest.java
copy to server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/filtering/CassandraFilteringProjectionModule.java
index 9972054f41..5387e795a4 100644
--- a/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/filtering/CassandraEventSourcingFilteringManagementTest.java
+++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/filtering/CassandraFilteringProjectionModule.java
@@ -19,16 +19,24 @@
 
 package org.apache.james.jmap.cassandra.filtering;
 
-import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreExtension;
-import org.apache.james.eventsourcing.eventstore.cassandra.JsonEventSerializer;
-import org.apache.james.jmap.api.filtering.FilteringManagementContract;
-import org.junit.jupiter.api.extension.RegisterExtension;
+import static com.datastax.oss.driver.api.core.type.DataTypes.INT;
+import static com.datastax.oss.driver.api.core.type.DataTypes.TEXT;
+import static com.datastax.oss.driver.api.core.type.DataTypes.frozenListOf;
 
-class CassandraEventSourcingFilteringManagementTest implements FilteringManagementContract {
-    @RegisterExtension
-    static CassandraEventStoreExtension eventStoreExtension =
-        new CassandraEventStoreExtension(JsonEventSerializer.forModules(
-                FilteringRuleSetDefineDTOModules.FILTERING_RULE_SET_DEFINED,
-                FilteringRuleSetDefineDTOModules.FILTERING_INCREMENT)
-            .withoutNestedType());
+import org.apache.james.backends.cassandra.components.CassandraModule;
+
+public interface CassandraFilteringProjectionModule {
+    String TABLE_NAME = "filters_projection";
+
+    String AGGREGATE_ID = "aggregate_id";
+    String EVENT_ID = "event_id";
+    String RULES = "rules";
+
+    CassandraModule MODULE = CassandraModule.table(TABLE_NAME)
+        .comment("Holds read projection for the event sourcing system managing JMAP filters.")
+        .statement(statement -> types -> statement
+            .withPartitionKey(AGGREGATE_ID, TEXT)
+            .withColumn(EVENT_ID, INT)
+            .withColumn(RULES, TEXT))
+        .build();
 }
diff --git a/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/filtering/CassandraEventSourcingFilteringManagementTest.java b/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/filtering/CassandraEventSourcingFilteringManagementNoProjectionTest.java
similarity index 54%
copy from server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/filtering/CassandraEventSourcingFilteringManagementTest.java
copy to server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/filtering/CassandraEventSourcingFilteringManagementNoProjectionTest.java
index 9972054f41..abe1636d3b 100644
--- a/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/filtering/CassandraEventSourcingFilteringManagementTest.java
+++ b/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/filtering/CassandraEventSourcingFilteringManagementNoProjectionTest.java
@@ -19,16 +19,38 @@
 
 package org.apache.james.jmap.cassandra.filtering;
 
-import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreExtension;
+import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.eventsourcing.eventstore.EventStore;
+import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStore;
+import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreModule$;
+import org.apache.james.eventsourcing.eventstore.cassandra.EventStoreDao;
 import org.apache.james.eventsourcing.eventstore.cassandra.JsonEventSerializer;
+import org.apache.james.jmap.api.filtering.FilteringManagement;
 import org.apache.james.jmap.api.filtering.FilteringManagementContract;
+import org.apache.james.jmap.api.filtering.impl.EventSourcingFilteringManagement;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
-class CassandraEventSourcingFilteringManagementTest implements FilteringManagementContract {
+class CassandraEventSourcingFilteringManagementNoProjectionTest implements FilteringManagementContract {
     @RegisterExtension
-    static CassandraEventStoreExtension eventStoreExtension =
-        new CassandraEventStoreExtension(JsonEventSerializer.forModules(
+    static CassandraClusterExtension eventStoreExtension = new CassandraClusterExtension(CassandraModule.aggregateModules(
+        CassandraEventStoreModule$.MODULE$.MODULE(),
+        CassandraFilteringProjectionModule.MODULE));
+
+    private EventStore eventStore;
+
+    @BeforeEach
+    void setUp() {
+        eventStore = new CassandraEventStore(new EventStoreDao(eventStoreExtension.getCassandraCluster().getConf(),
+            JsonEventSerializer.forModules(
                 FilteringRuleSetDefineDTOModules.FILTERING_RULE_SET_DEFINED,
-                FilteringRuleSetDefineDTOModules.FILTERING_INCREMENT)
-            .withoutNestedType());
+                FilteringRuleSetDefineDTOModules.FILTERING_INCREMENT).withoutNestedType()));
+    }
+
+    @Override
+    public FilteringManagement instantiateFilteringManagement() {
+        return new EventSourcingFilteringManagement(eventStore,
+            new EventSourcingFilteringManagement.NoReadProjection(eventStore));
+    }
 }
diff --git a/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/filtering/CassandraEventSourcingFilteringManagementTest.java b/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/filtering/CassandraEventSourcingFilteringManagementTest.java
index 9972054f41..714851f218 100644
--- a/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/filtering/CassandraEventSourcingFilteringManagementTest.java
+++ b/server/data/data-jmap-cassandra/src/test/java/org/apache/james/jmap/cassandra/filtering/CassandraEventSourcingFilteringManagementTest.java
@@ -19,16 +19,38 @@
 
 package org.apache.james.jmap.cassandra.filtering;
 
-import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreExtension;
+import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.components.CassandraModule;
+import org.apache.james.eventsourcing.eventstore.EventStore;
+import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStore;
+import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreModule$;
+import org.apache.james.eventsourcing.eventstore.cassandra.EventStoreDao;
 import org.apache.james.eventsourcing.eventstore.cassandra.JsonEventSerializer;
+import org.apache.james.jmap.api.filtering.FilteringManagement;
 import org.apache.james.jmap.api.filtering.FilteringManagementContract;
+import org.apache.james.jmap.api.filtering.impl.EventSourcingFilteringManagement;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
 class CassandraEventSourcingFilteringManagementTest implements FilteringManagementContract {
     @RegisterExtension
-    static CassandraEventStoreExtension eventStoreExtension =
-        new CassandraEventStoreExtension(JsonEventSerializer.forModules(
+    static CassandraClusterExtension eventStoreExtension = new CassandraClusterExtension(CassandraModule.aggregateModules(
+        CassandraEventStoreModule$.MODULE$.MODULE(),
+        CassandraFilteringProjectionModule.MODULE));
+
+    private EventStore eventStore;
+
+    @BeforeEach
+    void setUp() {
+        eventStore = new CassandraEventStore(new EventStoreDao(eventStoreExtension.getCassandraCluster().getConf(),
+            JsonEventSerializer.forModules(
                 FilteringRuleSetDefineDTOModules.FILTERING_RULE_SET_DEFINED,
-                FilteringRuleSetDefineDTOModules.FILTERING_INCREMENT)
-            .withoutNestedType());
+                FilteringRuleSetDefineDTOModules.FILTERING_INCREMENT).withoutNestedType()));
+    }
+
+    @Override
+    public FilteringManagement instantiateFilteringManagement() {
+        return new EventSourcingFilteringManagement(eventStore,
+            new CassandraFilteringProjection(eventStoreExtension.getCassandraCluster().getConf()));
+    }
 }
diff --git a/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/filtering/FilteringManagementContract.java b/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/filtering/FilteringManagementContract.java
index 47ad1ba064..b8cdb93f48 100644
--- a/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/filtering/FilteringManagementContract.java
+++ b/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/filtering/FilteringManagementContract.java
@@ -46,26 +46,24 @@ public interface FilteringManagementContract {
     String BART_SIMPSON_CARTOON = "bart@simpson.cartoon";
     Username USERNAME = Username.of(BART_SIMPSON_CARTOON);
 
-    default FilteringManagement instantiateFilteringManagement(EventStore eventStore) {
-        return new EventSourcingFilteringManagement(eventStore);
-    }
+    FilteringManagement instantiateFilteringManagement();
 
     @Test
-    default void listingRulesForUnknownUserShouldReturnEmptyList(EventStore eventStore) {
-        assertThat(Mono.from(instantiateFilteringManagement(eventStore).listRulesForUser(USERNAME)).block())
+    default void listingRulesForUnknownUserShouldReturnEmptyList() {
+        assertThat(Mono.from(instantiateFilteringManagement().listRulesForUser(USERNAME)).block())
             .isEqualTo(new Rules(ImmutableList.of(), new Version(-1)));
     }
 
     @Test
-    default void listingRulesShouldThrowWhenNullUser(EventStore eventStore) {
+    default void listingRulesShouldThrowWhenNullUser() {
         Username username = null;
-        assertThatThrownBy(() -> instantiateFilteringManagement(eventStore).listRulesForUser(username))
+        assertThatThrownBy(() -> instantiateFilteringManagement().listRulesForUser(username))
             .isInstanceOf(NullPointerException.class);
     }
 
     @Test
-    default void listingRulesShouldReturnDefinedRules(EventStore eventStore) {
-        FilteringManagement testee = instantiateFilteringManagement(eventStore);
+    default void listingRulesShouldReturnDefinedRules() {
+        FilteringManagement testee = instantiateFilteringManagement();
 
         Mono.from(testee.defineRulesForUser(USERNAME, Optional.empty(), RULE_1, RULE_2)).block();
 
@@ -74,8 +72,8 @@ public interface FilteringManagementContract {
     }
 
     @Test
-    default void listingRulesShouldReturnLastDefinedRules(EventStore eventStore) {
-        FilteringManagement testee = instantiateFilteringManagement(eventStore);
+    default void listingRulesShouldReturnLastDefinedRules() {
+        FilteringManagement testee = instantiateFilteringManagement();
 
         Mono.from(testee.defineRulesForUser(USERNAME, Optional.empty(), RULE_1, RULE_2)).block();
         Mono.from(testee.defineRulesForUser(USERNAME, Optional.empty(), RULE_2, RULE_1)).block();
@@ -85,24 +83,24 @@ public interface FilteringManagementContract {
     }
 
     @Test
-    default void definingRulesShouldThrowWhenDuplicateRules(EventStore eventStore) {
-        FilteringManagement testee = instantiateFilteringManagement(eventStore);
+    default void definingRulesShouldThrowWhenDuplicateRules() {
+        FilteringManagement testee = instantiateFilteringManagement();
 
         assertThatThrownBy(() -> Mono.from(testee.defineRulesForUser(USERNAME, Optional.empty(), RULE_1, RULE_1)).block())
             .isInstanceOf(IllegalArgumentException.class);
     }
 
     @Test
-    default void definingRulesShouldThrowWhenNullUser(EventStore eventStore) {
-        FilteringManagement testee = instantiateFilteringManagement(eventStore);
+    default void definingRulesShouldThrowWhenNullUser() {
+        FilteringManagement testee = instantiateFilteringManagement();
 
         assertThatThrownBy(() -> Mono.from(testee.defineRulesForUser(null, Optional.empty(), RULE_1, RULE_1)).block())
             .isInstanceOf(NullPointerException.class);
     }
 
     @Test
-    default void definingRulesShouldThrowWhenNullRuleList(EventStore eventStore) {
-        FilteringManagement testee = instantiateFilteringManagement(eventStore);
+    default void definingRulesShouldThrowWhenNullRuleList() {
+        FilteringManagement testee = instantiateFilteringManagement();
 
         List<Rule> rules = null;
         assertThatThrownBy(() -> Mono.from(testee.defineRulesForUser(USERNAME, rules, Optional.empty())).block())
@@ -110,8 +108,8 @@ public interface FilteringManagementContract {
     }
 
     @Test
-    default void definingRulesShouldKeepOrdering(EventStore eventStore) {
-        FilteringManagement testee = instantiateFilteringManagement(eventStore);
+    default void definingRulesShouldKeepOrdering() {
+        FilteringManagement testee = instantiateFilteringManagement();
         Mono.from(testee.defineRulesForUser(USERNAME, Optional.empty(), RULE_3, RULE_2, RULE_1)).block();
 
 
@@ -120,8 +118,8 @@ public interface FilteringManagementContract {
     }
 
     @Test
-    default void definingEmptyRuleListShouldRemoveExistingRules(EventStore eventStore) {
-        FilteringManagement testee = instantiateFilteringManagement(eventStore);
+    default void definingEmptyRuleListShouldRemoveExistingRules() {
+        FilteringManagement testee = instantiateFilteringManagement();
 
         Mono.from(testee.defineRulesForUser(USERNAME, Optional.empty(), RULE_3, RULE_2, RULE_1)).block();
         Mono.from(testee.clearRulesForUser(USERNAME)).block();
@@ -131,8 +129,8 @@ public interface FilteringManagementContract {
     }
 
     @Test
-    default void allFieldsAndComparatorShouldWellBeStored(EventStore eventStore) {
-        FilteringManagement testee = instantiateFilteringManagement(eventStore);
+    default void allFieldsAndComparatorShouldWellBeStored() {
+        FilteringManagement testee = instantiateFilteringManagement();
 
         Mono.from(testee.defineRulesForUser(USERNAME, Optional.empty(), RULE_FROM, RULE_RECIPIENT, RULE_SUBJECT, RULE_TO, RULE_1)).block();
 
@@ -141,16 +139,16 @@ public interface FilteringManagementContract {
     }
 
     @Test
-    default void setRulesWithEmptyVersionShouldSucceed(EventStore eventStore) {
-        FilteringManagement testee = instantiateFilteringManagement(eventStore);
+    default void setRulesWithEmptyVersionShouldSucceed() {
+        FilteringManagement testee = instantiateFilteringManagement();
 
         assertThat(Mono.from(testee.defineRulesForUser(USERNAME, Optional.empty(), RULE_3, RULE_2, RULE_1)).block())
             .isEqualTo(new Version(0));
     }
 
     @Test
-    default void modifyExistingRulesWithWrongCurrentVersionShouldFail(EventStore eventStore) {
-        FilteringManagement testee = instantiateFilteringManagement(eventStore);
+    default void modifyExistingRulesWithWrongCurrentVersionShouldFail() {
+        FilteringManagement testee = instantiateFilteringManagement();
 
         Mono.from(testee.defineRulesForUser(USERNAME, Optional.empty(), RULE_3, RULE_2, RULE_1)).block();
 
@@ -159,8 +157,8 @@ public interface FilteringManagementContract {
     }
 
     @Test
-    default void modifyExistingRulesWithRightVersionShouldSucceed(EventStore eventStore) {
-        FilteringManagement testee = instantiateFilteringManagement(eventStore);
+    default void modifyExistingRulesWithRightVersionShouldSucceed() {
+        FilteringManagement testee = instantiateFilteringManagement();
 
         Mono.from(testee.defineRulesForUser(USERNAME, Optional.empty(), RULE_3, RULE_2, RULE_1)).block();
 
@@ -169,8 +167,8 @@ public interface FilteringManagementContract {
     }
 
     @Test
-    default void givenARulesWithVersionIsOneThenUpdateRulesWithIfInStateIsOneShouldSucceed(EventStore eventStore) {
-        FilteringManagement testee = instantiateFilteringManagement(eventStore);
+    default void givenARulesWithVersionIsOneThenUpdateRulesWithIfInStateIsOneShouldSucceed() {
+        FilteringManagement testee = instantiateFilteringManagement();
 
         Mono.from(testee.defineRulesForUser(USERNAME, Optional.empty(), RULE_3, RULE_2, RULE_1)).block();
         Mono.from(testee.defineRulesForUser(USERNAME, Optional.empty(), RULE_3, RULE_2)).block();
@@ -183,16 +181,16 @@ public interface FilteringManagementContract {
     }
 
     @Test
-    default void setRulesWithEmptyIfInStateWhenNonStateIsDefinedShouldSucceed(EventStore eventStore) {
-        FilteringManagement testee = instantiateFilteringManagement(eventStore);
+    default void setRulesWithEmptyIfInStateWhenNonStateIsDefinedShouldSucceed() {
+        FilteringManagement testee = instantiateFilteringManagement();
 
         assertThat(Mono.from(testee.defineRulesForUser(USERNAME, Optional.empty(), RULE_3, RULE_2)).block())
             .isEqualTo(new Version(0));
     }
 
     @Test
-    default void setRulesWithEmptyIfInStateWhenAStateIsDefinedShouldSucceed(EventStore eventStore) {
-        FilteringManagement testee = instantiateFilteringManagement(eventStore);
+    default void setRulesWithEmptyIfInStateWhenAStateIsDefinedShouldSucceed() {
+        FilteringManagement testee = instantiateFilteringManagement();
 
         Mono.from(testee.defineRulesForUser(USERNAME, Optional.empty(), RULE_3, RULE_2)).block();
 
@@ -201,16 +199,16 @@ public interface FilteringManagementContract {
     }
 
     @Test
-    default void setRulesWithIfInStateIsInitialWhenNonStateIsDefinedShouldSucceed(EventStore eventStore) {
-        FilteringManagement testee = instantiateFilteringManagement(eventStore);
+    default void setRulesWithIfInStateIsInitialWhenNonStateIsDefinedShouldSucceed() {
+        FilteringManagement testee = instantiateFilteringManagement();
 
         assertThat(Mono.from(testee.defineRulesForUser(USERNAME, Optional.of(Version.INITIAL), RULE_3, RULE_2)).block())
             .isEqualTo(new Version(0));
     }
 
     @Test
-    default void setRulesWithIfInStateIsInitialWhenAStateIsDefinedShouldFail(EventStore eventStore) {
-        FilteringManagement testee = instantiateFilteringManagement(eventStore);
+    default void setRulesWithIfInStateIsInitialWhenAStateIsDefinedShouldFail() {
+        FilteringManagement testee = instantiateFilteringManagement();
 
         Mono.from(testee.defineRulesForUser(USERNAME, Optional.empty(), RULE_3, RULE_2, RULE_1)).block();
 
@@ -222,24 +220,24 @@ public interface FilteringManagementContract {
     }
 
     @Test
-    default void setRulesWithIfInStateIsOneWhenNonStateIsDefinedShouldFail(EventStore eventStore) {
-        FilteringManagement testee = instantiateFilteringManagement(eventStore);
+    default void setRulesWithIfInStateIsOneWhenNonStateIsDefinedShouldFail() {
+        FilteringManagement testee = instantiateFilteringManagement();
 
         assertThatThrownBy(() -> Mono.from(testee.defineRulesForUser(USERNAME, Optional.of(new Version(1)), RULE_2, RULE_1)).block())
             .isInstanceOf(StateMismatchException.class);
     }
 
     @Test
-    default void getLatestVersionWhenNonVersionIsDefinedShouldReturnVersionInitial(EventStore eventStore) {
-        FilteringManagement testee = instantiateFilteringManagement(eventStore);
+    default void getLatestVersionWhenNonVersionIsDefinedShouldReturnVersionInitial() {
+        FilteringManagement testee = instantiateFilteringManagement();
 
         assertThat(Mono.from(testee.getLatestVersion(USERNAME)).block())
             .isEqualTo(Version.INITIAL);
     }
 
     @Test
-    default void getLatestVersionAfterSetRulesFirstTimeShouldReturnVersionZero(EventStore eventStore) {
-        FilteringManagement testee = instantiateFilteringManagement(eventStore);
+    default void getLatestVersionAfterSetRulesFirstTimeShouldReturnVersionZero() {
+        FilteringManagement testee = instantiateFilteringManagement();
 
         Mono.from(testee.defineRulesForUser(USERNAME, Optional.empty(), RULE_3, RULE_2, RULE_1)).block();
 
@@ -248,8 +246,8 @@ public interface FilteringManagementContract {
     }
 
     @Test
-    default void getLatestVersionAfterSetRulesNotSucceedShouldReturnOldVersion(EventStore eventStore) {
-        FilteringManagement testee = instantiateFilteringManagement(eventStore);
+    default void getLatestVersionAfterSetRulesNotSucceedShouldReturnOldVersion() {
+        FilteringManagement testee = instantiateFilteringManagement();
 
         Mono.from(testee.defineRulesForUser(USERNAME, Optional.empty(), RULE_3, RULE_2, RULE_1)).block();
         assertThat(Mono.from(testee.getLatestVersion(USERNAME)).block())
diff --git a/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/filtering/impl/InMemoryEventSourcingFilteringManagementTest.java b/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/filtering/impl/InMemoryEventSourcingFilteringManagementTest.java
index 39dffe7002..1dc1f33af1 100644
--- a/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/filtering/impl/InMemoryEventSourcingFilteringManagementTest.java
+++ b/server/data/data-jmap/src/test/java/org/apache/james/jmap/api/filtering/impl/InMemoryEventSourcingFilteringManagementTest.java
@@ -19,11 +19,24 @@
 
 package org.apache.james.jmap.api.filtering.impl;
 
+import org.apache.james.eventsourcing.eventstore.EventStore;
+import org.apache.james.eventsourcing.eventstore.memory.InMemoryEventStore;
 import org.apache.james.eventsourcing.eventstore.memory.InMemoryEventStoreExtension;
+import org.apache.james.jmap.api.filtering.FilteringManagement;
 import org.apache.james.jmap.api.filtering.FilteringManagementContract;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.extension.ExtendWith;
 
-@ExtendWith(InMemoryEventStoreExtension.class)
 public class InMemoryEventSourcingFilteringManagementTest implements FilteringManagementContract {
+    private EventStore eventStore;
 
+    @BeforeEach
+    void setUp() {
+        eventStore = new InMemoryEventStore();
+    }
+
+    @Override
+    public FilteringManagement instantiateFilteringManagement() {
+        return new EventSourcingFilteringManagement(eventStore);
+    }
 }
diff --git a/src/site/xdoc/server/config-jmap.xml b/src/site/xdoc/server/config-jmap.xml
index 9cfc5fa1c7..ff375f99ad 100644
--- a/src/site/xdoc/server/config-jmap.xml
+++ b/src/site/xdoc/server/config-jmap.xml
@@ -125,6 +125,12 @@
                     <dd>Optional boolean. Prevent server side request forgery by preventing calls to the private network
                         ranges. Defaults to true, can be disabled for testing.
                     </dd>
+
+                    <dt><strong>cassandra.filter.projection.activated</strong></dt>
+                    <dd>Optional boolean. Defaults to false. Casandra backends only. Whether to use or not the Cassandra projection
+                        for JMAP filters. This projection optimizes reads, but needs to be correctly populated. Turning it on on
+                        systems with filters already defined would result in those filters to be not read.
+                    </dd>
                 </dl>
 
             </subsection>


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 02/05: JAMES-3777 EventSourcingFilteringManagement: allow to customize the read projection

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 776dd4cb6d63044b2cd62398448638b1345c106e
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Apr 14 13:46:40 2023 +0700

    JAMES-3777 EventSourcingFilteringManagement: allow to customize the read projection
---
 .../impl/EventSourcingFilteringManagement.java     | 74 ++++++++++++++++------
 1 file changed, 55 insertions(+), 19 deletions(-)

diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java
index 8dfda3d241..8061428e2c 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/EventSourcingFilteringManagement.java
@@ -19,12 +19,14 @@
 
 package org.apache.james.jmap.api.filtering.impl;
 
+import java.util.Comparator;
 import java.util.List;
 import java.util.Optional;
 
 import javax.inject.Inject;
 
 import org.apache.james.core.Username;
+import org.apache.james.eventsourcing.Event;
 import org.apache.james.eventsourcing.EventSourcingSystem;
 import org.apache.james.eventsourcing.Subscriber;
 import org.apache.james.eventsourcing.eventstore.EventStore;
@@ -42,19 +44,67 @@ import com.google.common.collect.ImmutableSet;
 import reactor.core.publisher.Mono;
 
 public class EventSourcingFilteringManagement implements FilteringManagement {
+    public interface ReadProjection {
+        Publisher<Rules> listRulesForUser(Username username);
+
+        Publisher<Version> getLatestVersion(Username username);
+
+        Optional<Subscriber> subscriber();
+    }
+
+    public static class NoReadProjection implements ReadProjection {
+        private final EventStore eventStore;
+
+        @Inject
+        public NoReadProjection(EventStore eventStore) {
+            this.eventStore = eventStore;
+        }
+
+        @Override
+        public Publisher<Rules> listRulesForUser(Username username) {
+            Preconditions.checkNotNull(username);
+
+            FilteringAggregateId aggregateId = new FilteringAggregateId(username);
+
+            return Mono.from(eventStore.getEventsOfAggregate(aggregateId))
+                .map(history -> FilteringAggregate.load(aggregateId, history).listRules())
+                .defaultIfEmpty(new Rules(ImmutableList.of(), Version.INITIAL));
+        }
+
+        @Override
+        public Publisher<Version> getLatestVersion(Username username) {
+            Preconditions.checkNotNull(username);
+
+            FilteringAggregateId aggregateId = new FilteringAggregateId(username);
+
+            return Mono.from(eventStore.getEventsOfAggregate(aggregateId))
+                .map(History::getVersionAsJava)
+                .map(eventIdOptional -> eventIdOptional.map(eventId -> new Version(eventId.value()))
+                    .orElse(Version.INITIAL));
+        }
+
+        @Override
+        public Optional<Subscriber> subscriber() {
+            return Optional.empty();
+        }
+    }
 
     private static final ImmutableSet<Subscriber> NO_SUBSCRIBER = ImmutableSet.of();
 
-    private final EventStore eventStore;
+    private final ReadProjection readProjection;
     private final EventSourcingSystem eventSourcingSystem;
 
     @Inject
     public EventSourcingFilteringManagement(EventStore eventStore) {
+        this(eventStore, new NoReadProjection(eventStore));
+    }
+
+    public EventSourcingFilteringManagement(EventStore eventStore, ReadProjection readProjection) {
+        this.readProjection = new NoReadProjection(eventStore);
         this.eventSourcingSystem = EventSourcingSystem.fromJava(
             ImmutableSet.of(new DefineRulesCommandHandler(eventStore)),
-            NO_SUBSCRIBER,
+            readProjection.subscriber().map(ImmutableSet::of).orElse(NO_SUBSCRIBER),
             eventStore);
-        this.eventStore = eventStore;
     }
 
     @Override
@@ -68,25 +118,11 @@ public class EventSourcingFilteringManagement implements FilteringManagement {
 
     @Override
     public Publisher<Rules> listRulesForUser(Username username) {
-        Preconditions.checkNotNull(username);
-
-        FilteringAggregateId aggregateId = new FilteringAggregateId(username);
-
-        return Mono.from(eventStore.getEventsOfAggregate(aggregateId))
-            .map(history -> FilteringAggregate.load(aggregateId, history).listRules())
-            .defaultIfEmpty(new Rules(ImmutableList.of(), Version.INITIAL));
+        return readProjection.listRulesForUser(username);
     }
 
     @Override
     public Publisher<Version> getLatestVersion(Username username) {
-        Preconditions.checkNotNull(username);
-
-        FilteringAggregateId aggregateId = new FilteringAggregateId(username);
-
-        return Mono.from(eventStore.getEventsOfAggregate(aggregateId))
-            .map(History::getVersionAsJava)
-            .map(eventIdOptional -> eventIdOptional.map(eventId -> new Version(eventId.value()))
-                .orElse(Version.INITIAL));
+        return readProjection.getLatestVersion(username);
     }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org