You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2023/04/24 08:50:32 UTC

[james-project] branch master updated: JAMES-3777 EventSource subscriber should work with immutable state copy

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 395dad3c32 JAMES-3777 EventSource subscriber should work with immutable state copy
395dad3c32 is described below

commit 395dad3c3286185ca55c9f694448158452bc6061
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Apr 21 18:15:18 2023 +0700

    JAMES-3777 EventSource subscriber should work with immutable state copy
    
    This ease work of subscriber and prevent some reads
---
 .../apache/james/eventsourcing/CommandHandler.java |  2 +-
 .../james/eventsourcing/CommandDispatcher.scala    |  8 ++-
 .../org/apache/james/eventsourcing/EventBus.scala  | 14 ++---
 .../apache/james/eventsourcing/Subscriber.scala    | 10 +--
 .../eventsourcing/DataCollectorSubscriber.scala    |  2 +-
 .../eventsourcing/EventSourcingSystemTest.scala    | 13 ++--
 .../org/apache/james/eventsourcing/Event.scala     | 10 ++-
 .../james/eventsourcing/eventstore/History.scala   |  4 +-
 .../mail/eventsourcing/acl/AclV2DAOSubscriber.java |  4 +-
 .../eventsourcing/acl/DeleteMailboxCommand.java    |  4 +-
 .../eventsourcing/acl/MailboxACLAggregate.java     | 25 +++++---
 .../mail/eventsourcing/acl/SetACLCommand.java      |  4 +-
 .../mail/eventsourcing/acl/UpdateACLCommand.java   |  4 +-
 .../eventsourcing/acl/UserRightsDAOSubscriber.java |  4 +-
 .../mailing/aggregates/UserQuotaThresholds.java    |  9 ++-
 .../commands/DetectThresholdCrossingHandler.java   |  4 +-
 .../mailing/subscribers/QuotaThresholdMailer.java  |  4 +-
 .../RegisterStorageStrategyCommandHandler.java     |  4 +-
 .../validation/StorageStrategyAggregate.java       |  6 +-
 .../filtering/CassandraFilteringProjection.java    | 73 ++++++++++++----------
 .../filtering/impl/DefineRulesCommandHandler.java  |  5 +-
 .../api/filtering/impl/FilteringAggregate.java     | 41 +++++++-----
 .../aggregates/DLPDomainConfiguration.java         | 16 ++---
 .../commands/ClearCommandHandler.java              |  4 +-
 .../commands/StoreCommandHandler.java              |  4 +-
 .../data/jmap/PopulateFilteringProjectionTask.java |  8 ++-
 ...pulateFilteringProjectionRequestToTaskTest.java | 20 ++----
 .../configuration/ConfigurationAggregate.java      |  9 +--
 .../RegisterConfigurationCommandHandler.java       |  4 +-
 .../james/task/eventsourcing/CommandHandlers.scala | 31 +++++----
 .../eventsourcing/EventSourcingTaskManager.scala   |  2 +-
 .../TaskExecutionDetailsProjection.scala           |  2 +-
 .../task/eventsourcing/TerminationSubscriber.scala |  6 +-
 .../TerminationSubscriberContract.java             |  2 +
 34 files changed, 204 insertions(+), 158 deletions(-)

diff --git a/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/CommandHandler.java b/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/CommandHandler.java
index dd690832f0..129cefdc58 100644
--- a/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/CommandHandler.java
+++ b/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/CommandHandler.java
@@ -25,5 +25,5 @@ import org.reactivestreams.Publisher;
 public interface CommandHandler<C extends Command> {
   Class<C> handledClass();
 
-  Publisher<List<? extends Event>> handle(C command);
+  Publisher<List<EventWithState>> handle(C command);
 }
diff --git a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/CommandDispatcher.scala b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/CommandDispatcher.scala
index ce2e40c4d8..14f45df78b 100644
--- a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/CommandDispatcher.scala
+++ b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/CommandDispatcher.scala
@@ -19,8 +19,9 @@
 package org.apache.james.eventsourcing
 
 import java.util
-import com.google.common.base.Preconditions
 
+import com.google.common.base.Preconditions
+import com.google.common.collect.ImmutableList
 import javax.inject.Inject
 import org.apache.james.eventsourcing.eventstore.EventStoreFailedException
 import org.reactivestreams.Publisher
@@ -74,13 +75,14 @@ class CommandDispatcher @Inject()(eventBus: EventBus, handlers: Set[CommandHandl
     handleCommand(c) match {
       case Some(eventsPublisher) =>
         SMono(eventsPublisher)
-          .flatMap(events => eventBus.publish(events.asScala).`then`(SMono.just(events)))
+          .flatMap(events => eventBus.publish(events.asScala)
+            .`then`(SMono.just(events.asScala.map(_.event).asJava)))
       case _ =>
         SMono.error(CommandDispatcher.UnknownCommandException(c))
     }
   }
 
-  private def handleCommand(c: Command): Option[Publisher[util.List[_ <: Event]]] = {
+  private def handleCommand(c: Command): Option[Publisher[util.List[EventWithState]]] = {
     handlersByClass
       .get(c.getClass)
       .map(commandHandler =>
diff --git a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventBus.scala b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventBus.scala
index ad1e310e9b..1f189cf0ca 100644
--- a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventBus.scala
+++ b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventBus.scala
@@ -30,20 +30,20 @@ object EventBus {
 
 class EventBus @Inject() (eventStore: EventStore, subscribers: Set[Subscriber]) {
   @throws[EventStoreFailedException]
-  def publish(events: Iterable[Event]): SMono[Void] =
-    SMono(eventStore.appendAll(events))
+  def publish(events: Iterable[EventWithState]): SMono[Void] =
+    SMono(eventStore.appendAll(events.map(_.event)))
         .`then`(runHandlers(events))
 
-  private def runHandlers(events: Iterable[Event]): SMono[Void] =
-    SFlux.fromIterable(events.flatMap((event: Event) => subscribers.map(subscriber => (event, subscriber))))
+  private def runHandlers(events: Iterable[EventWithState]): SMono[Void] =
+    SFlux.fromIterable(events.flatMap((commandExecuted: EventWithState) => subscribers.map(subscriber => (commandExecuted, subscriber))))
       .concatMap(infos => runHandler(infos._1, infos._2))
       .`then`()
       .`then`(SMono.empty)
 
-  private def runHandler(event: Event, subscriber: Subscriber): Publisher[Void] =
-    SMono(ReactiveSubscriber.asReactiveSubscriber(subscriber).handleReactive(event))
+  private def runHandler(commandExecuted: EventWithState, subscriber: Subscriber): Publisher[Void] =
+    SMono(ReactiveSubscriber.asReactiveSubscriber(subscriber).handleReactive(commandExecuted))
       .onErrorResume(e => {
-        EventBus.LOGGER.error("Error while calling {} for {}", subscriber, event, e)
+        EventBus.LOGGER.error("Error while calling {} for {}", subscriber, commandExecuted, e)
         SMono.empty
       })
 }
\ No newline at end of file
diff --git a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/Subscriber.scala b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/Subscriber.scala
index 4b823c417f..1acd9000c3 100644
--- a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/Subscriber.scala
+++ b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/Subscriber.scala
@@ -23,13 +23,13 @@ import org.reactivestreams.Publisher
 import reactor.core.scala.publisher.SMono
 
 trait Subscriber {
-  def handle(event: Event) : Unit
+  def handle(commandExecuted: EventWithState) : Unit
 }
 
 trait ReactiveSubscriber extends Subscriber {
-  def handleReactive(event: Event): Publisher[Void]
+  def handleReactive(commandExecuted: EventWithState): Publisher[Void]
 
-  override def handle(event: Event) : Unit = SMono(handleReactive(event)).block()
+  override def handle(commandExecuted: EventWithState) : Unit = SMono(handleReactive(commandExecuted)).block()
 }
 
 object ReactiveSubscriber {
@@ -40,9 +40,9 @@ object ReactiveSubscriber {
 }
 
 class ReactiveSubscriberWrapper(delegate: Subscriber) extends ReactiveSubscriber {
-  override def handle(event: Event) : Unit = delegate.handle(event)
+  override def handle(commandExecuted: EventWithState) : Unit = delegate.handle(commandExecuted)
 
-  def handleReactive(event: Event): Publisher[Void] = SMono.fromCallable(() => handle(event))
+  def handleReactive(commandExecuted: EventWithState): Publisher[Void] = SMono.fromCallable(() => handle(commandExecuted))
     .subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
     .`then`()
 }
\ No newline at end of file
diff --git a/event-sourcing/event-sourcing-core/src/test/scala/org/apache/james/eventsourcing/DataCollectorSubscriber.scala b/event-sourcing/event-sourcing-core/src/test/scala/org/apache/james/eventsourcing/DataCollectorSubscriber.scala
index 58e3fc987a..2e0912f348 100644
--- a/event-sourcing/event-sourcing-core/src/test/scala/org/apache/james/eventsourcing/DataCollectorSubscriber.scala
+++ b/event-sourcing/event-sourcing-core/src/test/scala/org/apache/james/eventsourcing/DataCollectorSubscriber.scala
@@ -23,7 +23,7 @@ import scala.collection.mutable
 class DataCollectorSubscriber() extends Subscriber {
   private val data = new mutable.ListBuffer[String]
 
-  override def handle(event: Event): Unit = event match {
+  override def handle(event: EventWithState): Unit = event.event match {
     case event: TestEvent => data += event.getData
   }
 
diff --git a/event-sourcing/event-sourcing-core/src/test/scala/org/apache/james/eventsourcing/EventSourcingSystemTest.scala b/event-sourcing/event-sourcing-core/src/test/scala/org/apache/james/eventsourcing/EventSourcingSystemTest.scala
index 55559074c9..e658841c97 100644
--- a/event-sourcing/event-sourcing-core/src/test/scala/org/apache/james/eventsourcing/EventSourcingSystemTest.scala
+++ b/event-sourcing/event-sourcing-core/src/test/scala/org/apache/james/eventsourcing/EventSourcingSystemTest.scala
@@ -73,7 +73,7 @@ trait EventSourcingSystemTest {
     val subscriber = new DataCollectorSubscriber
     val eventSourcingSystem = new EventSourcingSystem(
       Set(simpleDispatcher(eventStore)),
-      Set((_: Event) => throw new RuntimeException, subscriber),
+      Set((_: EventWithState) => throw new RuntimeException, subscriber),
       eventStore)
     Mono.from(eventSourcingSystem.dispatch(new EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_1))).block()
     assertThat(subscriber.getData.asJava).containsExactly(EventSourcingSystemTest.PAYLOAD_1)
@@ -87,7 +87,7 @@ trait EventSourcingSystemTest {
     val subscriber = new DataCollectorSubscriber
     val eventSourcingSystem = new EventSourcingSystem(
       Set(simpleDispatcher(eventStore)),
-      Set((_: Event) => throw new RuntimeException, subscriber),
+      Set((_: EventWithState) => throw new RuntimeException, subscriber),
       eventStore)
     assertThatThrownBy(() => Mono.from(eventSourcingSystem.dispatch(new EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_1))).block())
       .isInstanceOf(classOf[RuntimeException])
@@ -151,22 +151,23 @@ trait EventSourcingSystemTest {
   def simpleDispatcher(eventStore: EventStore) = new CommandHandler[EventSourcingSystemTest.MyCommand]() {
     override def handledClass: Class[EventSourcingSystemTest.MyCommand] = classOf[EventSourcingSystemTest.MyCommand]
 
-    override def handle(myCommand: EventSourcingSystemTest.MyCommand): Publisher[JavaList[_ <: Event]] = {
+    override def handle(myCommand: EventSourcingSystemTest.MyCommand): Publisher[JavaList[EventWithState]] = {
       SMono.apply(eventStore.getEventsOfAggregate(EventSourcingSystemTest.AGGREGATE_ID))
-          .map(history => Seq(TestEvent(history.getNextEventId, EventSourcingSystemTest.AGGREGATE_ID, myCommand.getPayload)).asJava)
+          .map(history => Seq(EventWithState.noState(
+            TestEvent(history.getNextEventId, EventSourcingSystemTest.AGGREGATE_ID, myCommand.getPayload))).asJava)
     }
   }
 
   def wordCuttingDispatcher(eventStore: EventStore) = new CommandHandler[EventSourcingSystemTest.MyCommand]() {
     override def handledClass: Class[EventSourcingSystemTest.MyCommand] = classOf[EventSourcingSystemTest.MyCommand]
 
-    override def handle(myCommand: EventSourcingSystemTest.MyCommand): Publisher[JavaList[_ <: Event]] = {
+    override def handle(myCommand: EventSourcingSystemTest.MyCommand): Publisher[JavaList[EventWithState]] = {
       SMono.apply(eventStore.getEventsOfAggregate(EventSourcingSystemTest.AGGREGATE_ID))
         .map(history => new EventSourcingSystemTest.EventIdIncrementer(history.getNextEventId))
         .map(eventIdIncrementer => Splitter.on(" ").splitToList(myCommand.getPayload)
           .asScala
           .toList
-          .map((word: String) => TestEvent(eventIdIncrementer.next, EventSourcingSystemTest.AGGREGATE_ID, word)).asJava)
+          .map((word: String) => EventWithState.noState(TestEvent(eventIdIncrementer.next, EventSourcingSystemTest.AGGREGATE_ID, word))).asJava)
     }
   }
 }
\ No newline at end of file
diff --git a/event-sourcing/event-sourcing-pojo/src/main/scala/org/apache/james/eventsourcing/Event.scala b/event-sourcing/event-sourcing-pojo/src/main/scala/org/apache/james/eventsourcing/Event.scala
index 4fbb7f8215..82e295ec34 100644
--- a/event-sourcing/event-sourcing-pojo/src/main/scala/org/apache/james/eventsourcing/Event.scala
+++ b/event-sourcing/event-sourcing-pojo/src/main/scala/org/apache/james/eventsourcing/Event.scala
@@ -36,4 +36,12 @@ trait Event extends Comparable[Event] {
   def isASnapshot: Boolean = false
 
   override def compareTo(o: Event): Int = eventId.compareTo(o.eventId)
-}
\ No newline at end of file
+}
+
+trait ImmutableState
+
+object EventWithState {
+  def noState(event: Event): EventWithState = EventWithState(event, None)
+}
+
+case class EventWithState(event: Event, state: Option[ImmutableState])
\ No newline at end of file
diff --git a/event-sourcing/event-store-api/src/main/scala/org/apache/james/eventsourcing/eventstore/History.scala b/event-sourcing/event-store-api/src/main/scala/org/apache/james/eventsourcing/eventstore/History.scala
index 9eeff9d076..d55c2e2958 100644
--- a/event-sourcing/event-store-api/src/main/scala/org/apache/james/eventsourcing/eventstore/History.scala
+++ b/event-sourcing/event-store-api/src/main/scala/org/apache/james/eventsourcing/eventstore/History.scala
@@ -52,9 +52,9 @@ final case class History private(events: List[Event]) {
 
   def getVersionAsJava: Optional[EventId] = getVersion.toJava
 
-  def getEvents:List[Event] = events
+  def getEvents: List[Event] = events
 
-  def getEventsJava:java.util.List[Event] = events.asJava
+  def getEventsJava: java.util.List[Event] = events.asJava
 
   def getNextEventId: EventId = getVersion
     .map(eventId => eventId.next)
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/AclV2DAOSubscriber.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/AclV2DAOSubscriber.java
index 1b3ecada56..d338858138 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/AclV2DAOSubscriber.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/AclV2DAOSubscriber.java
@@ -20,6 +20,7 @@
 package org.apache.james.mailbox.cassandra.mail.eventsourcing.acl;
 
 import org.apache.james.eventsourcing.Event;
+import org.apache.james.eventsourcing.EventWithState;
 import org.apache.james.eventsourcing.ReactiveSubscriber;
 import org.apache.james.mailbox.cassandra.mail.CassandraACLDAOV2;
 
@@ -34,7 +35,8 @@ public class AclV2DAOSubscriber implements ReactiveSubscriber {
     }
 
     @Override
-    public Mono<Void> handleReactive(Event event) {
+    public Mono<Void> handleReactive(EventWithState eventWithState) {
+        Event event = eventWithState.event();
         if (event instanceof ACLUpdated) {
             ACLUpdated aclUpdated = (ACLUpdated) event;
 
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/DeleteMailboxCommand.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/DeleteMailboxCommand.java
index 55c2af9d0f..feec9c4646 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/DeleteMailboxCommand.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/DeleteMailboxCommand.java
@@ -22,7 +22,7 @@ package org.apache.james.mailbox.cassandra.mail.eventsourcing.acl;
 import java.util.List;
 
 import org.apache.james.eventsourcing.Command;
-import org.apache.james.eventsourcing.Event;
+import org.apache.james.eventsourcing.EventWithState;
 import org.apache.james.eventsourcing.eventstore.EventStore;
 import org.reactivestreams.Publisher;
 
@@ -42,7 +42,7 @@ public class DeleteMailboxCommand implements Command {
         }
 
         @Override
-        public Publisher<List<? extends Event>> handle(DeleteMailboxCommand command) {
+        public Publisher<List<EventWithState>> handle(DeleteMailboxCommand command) {
             return Mono.from(eventStore.getEventsOfAggregate(command.getId()))
                 .map(history -> MailboxACLAggregate.load(command.getId(), history))
                 .map(MailboxACLAggregate::deleteMailbox);
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/MailboxACLAggregate.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/MailboxACLAggregate.java
index 79ca0d5098..f419455e6b 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/MailboxACLAggregate.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/MailboxACLAggregate.java
@@ -23,6 +23,7 @@ import java.util.List;
 import java.util.Optional;
 
 import org.apache.james.eventsourcing.Event;
+import org.apache.james.eventsourcing.EventWithState;
 import org.apache.james.eventsourcing.eventstore.History;
 import org.apache.james.mailbox.acl.ACLDiff;
 import org.apache.james.mailbox.exception.UnsupportedRightException;
@@ -65,21 +66,27 @@ public class MailboxACLAggregate {
             .forEach(this::apply);
     }
 
-    public List<Event> deleteMailbox() {
-        return ImmutableList.of(new ACLUpdated(aggregateId, history.getNextEventId(),
-            ACLDiff.computeDiff(state.acl.orElse(MailboxACL.EMPTY), MailboxACL.EMPTY)));
+    public List<EventWithState> deleteMailbox() {
+        ACLUpdated event = new ACLUpdated(aggregateId, history.getNextEventId(),
+            ACLDiff.computeDiff(state.acl.orElse(MailboxACL.EMPTY), MailboxACL.EMPTY));
+        apply(event);
+        return ImmutableList.of(EventWithState.noState(event));
     }
 
-    public List<Event> set(SetACLCommand setACLCommand) {
-        return ImmutableList.of(new ACLUpdated(aggregateId, history.getNextEventId(),
-            ACLDiff.computeDiff(state.acl.orElse(MailboxACL.EMPTY), setACLCommand.getAcl())));
+    public List<EventWithState> set(SetACLCommand setACLCommand) {
+        ACLUpdated event = new ACLUpdated(aggregateId, history.getNextEventId(),
+            ACLDiff.computeDiff(state.acl.orElse(MailboxACL.EMPTY), setACLCommand.getAcl()));
+        apply(event);
+        return ImmutableList.of(EventWithState.noState(event));
     }
 
-    public List<Event> update(UpdateACLCommand command) throws UnsupportedRightException {
+    public List<EventWithState> update(UpdateACLCommand command) throws UnsupportedRightException {
         MailboxACL oldACL = state.acl.orElse(MailboxACL.EMPTY);
-        return ImmutableList.of(new ACLUpdated(command.getId(), history.getNextEventId(),
+        ACLUpdated event = new ACLUpdated(command.getId(), history.getNextEventId(),
             ACLDiff.computeDiff(oldACL,
-                oldACL.apply(command.getAclCommand()))));
+                oldACL.apply(command.getAclCommand())));
+        apply(event);
+        return ImmutableList.of(EventWithState.noState(event));
     }
 
     private void apply(Event event) {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/SetACLCommand.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/SetACLCommand.java
index 8b797f5c97..a0f44b3659 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/SetACLCommand.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/SetACLCommand.java
@@ -22,7 +22,7 @@ package org.apache.james.mailbox.cassandra.mail.eventsourcing.acl;
 import java.util.List;
 
 import org.apache.james.eventsourcing.Command;
-import org.apache.james.eventsourcing.Event;
+import org.apache.james.eventsourcing.EventWithState;
 import org.apache.james.eventsourcing.eventstore.EventStore;
 import org.apache.james.mailbox.model.MailboxACL;
 import org.reactivestreams.Publisher;
@@ -43,7 +43,7 @@ public class SetACLCommand implements Command {
         }
 
         @Override
-        public Publisher<List<? extends Event>> handle(SetACLCommand command) {
+        public Publisher<List<EventWithState>> handle(SetACLCommand command) {
             return Mono.from(eventStore.getEventsOfAggregate(command.getId()))
                 .map(history -> MailboxACLAggregate.load(command.getId(), history))
                 .map(aggregate -> aggregate.set(command));
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/UpdateACLCommand.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/UpdateACLCommand.java
index 607657fb36..b5222b6729 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/UpdateACLCommand.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/UpdateACLCommand.java
@@ -22,7 +22,7 @@ package org.apache.james.mailbox.cassandra.mail.eventsourcing.acl;
 import java.util.List;
 
 import org.apache.james.eventsourcing.Command;
-import org.apache.james.eventsourcing.Event;
+import org.apache.james.eventsourcing.EventWithState;
 import org.apache.james.eventsourcing.eventstore.EventStore;
 import org.apache.james.mailbox.model.MailboxACL;
 import org.reactivestreams.Publisher;
@@ -45,7 +45,7 @@ public class UpdateACLCommand implements Command {
         }
 
         @Override
-        public Publisher<List<? extends Event>> handle(UpdateACLCommand command) {
+        public Publisher<List<EventWithState>> handle(UpdateACLCommand command) {
             return Mono.from(eventStore.getEventsOfAggregate(command.getId()))
                 .map(history -> MailboxACLAggregate.load(command.getId(), history))
                 .map(Throwing.function(aggregate -> aggregate.update(command)));
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/UserRightsDAOSubscriber.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/UserRightsDAOSubscriber.java
index 32eec22492..1e76ad5e0d 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/UserRightsDAOSubscriber.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/UserRightsDAOSubscriber.java
@@ -20,6 +20,7 @@
 package org.apache.james.mailbox.cassandra.mail.eventsourcing.acl;
 
 import org.apache.james.eventsourcing.Event;
+import org.apache.james.eventsourcing.EventWithState;
 import org.apache.james.eventsourcing.ReactiveSubscriber;
 import org.apache.james.mailbox.cassandra.mail.CassandraUserMailboxRightsDAO;
 import org.reactivestreams.Publisher;
@@ -34,7 +35,8 @@ public class UserRightsDAOSubscriber implements ReactiveSubscriber {
     }
 
     @Override
-    public Publisher<Void> handleReactive(Event event) {
+    public Publisher<Void> handleReactive(EventWithState eventWithState) {
+        Event event = eventWithState.event();
         if (event instanceof ACLUpdated) {
             ACLUpdated aclUpdated = (ACLUpdated) event;
             return userRightsDAO.update(aclUpdated.mailboxId(), aclUpdated.getAclDiff());
diff --git a/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/aggregates/UserQuotaThresholds.java b/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/aggregates/UserQuotaThresholds.java
index 59d0e0d168..47f826047a 100644
--- a/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/aggregates/UserQuotaThresholds.java
+++ b/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/aggregates/UserQuotaThresholds.java
@@ -32,6 +32,7 @@ import org.apache.james.core.quota.QuotaCountUsage;
 import org.apache.james.core.quota.QuotaSizeLimit;
 import org.apache.james.core.quota.QuotaSizeUsage;
 import org.apache.james.eventsourcing.AggregateId;
+import org.apache.james.eventsourcing.EventWithState;
 import org.apache.james.eventsourcing.eventstore.History;
 import org.apache.james.mailbox.model.Quota;
 import org.apache.james.mailbox.quota.mailing.QuotaMailingListenerConfiguration;
@@ -131,8 +132,8 @@ public class UserQuotaThresholds {
             .collect(Collectors.toList());
     }
 
-    public List<QuotaThresholdChangedEvent> detectThresholdCrossing(QuotaMailingListenerConfiguration configuration,
-                                                                    DetectThresholdCrossing command) {
+    public List<EventWithState> detectThresholdCrossing(QuotaMailingListenerConfiguration configuration,
+                                                        DetectThresholdCrossing command) {
 
         List<QuotaThresholdChangedEvent> events = generateEvents(
             configuration.getThresholds(),
@@ -141,7 +142,9 @@ public class UserQuotaThresholds {
             command.getSizeQuota(),
             command.getInstant());
         events.forEach(this::apply);
-        return events;
+        return events.stream()
+            .map(EventWithState::noState)
+            .collect(ImmutableList.toImmutableList());
     }
 
     private List<QuotaThresholdChangedEvent> generateEvents(QuotaThresholds configuration, Duration gracePeriod, Quota<QuotaCountLimit, QuotaCountUsage> countQuota, Quota<QuotaSizeLimit, QuotaSizeUsage> sizeQuota, Instant now) {
diff --git a/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/commands/DetectThresholdCrossingHandler.java b/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/commands/DetectThresholdCrossingHandler.java
index 551644c31f..cd66c3208e 100644
--- a/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/commands/DetectThresholdCrossingHandler.java
+++ b/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/commands/DetectThresholdCrossingHandler.java
@@ -22,7 +22,7 @@ package org.apache.james.mailbox.quota.mailing.commands;
 import java.util.List;
 
 import org.apache.james.eventsourcing.CommandHandler;
-import org.apache.james.eventsourcing.Event;
+import org.apache.james.eventsourcing.EventWithState;
 import org.apache.james.eventsourcing.eventstore.EventStore;
 import org.apache.james.mailbox.quota.mailing.QuotaMailingListenerConfiguration;
 import org.apache.james.mailbox.quota.mailing.aggregates.UserQuotaThresholds;
@@ -43,7 +43,7 @@ public class DetectThresholdCrossingHandler implements CommandHandler<DetectThre
     }
 
     @Override
-    public Publisher<List<? extends Event>> handle(DetectThresholdCrossing command) {
+    public Publisher<List<EventWithState>> handle(DetectThresholdCrossing command) {
         return loadAggregate(command)
             .map(aggregate -> aggregate.detectThresholdCrossing(quotaMailingListenerConfiguration, command));
     }
diff --git a/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/subscribers/QuotaThresholdMailer.java b/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/subscribers/QuotaThresholdMailer.java
index ddd06ffa16..117e130e6c 100644
--- a/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/subscribers/QuotaThresholdMailer.java
+++ b/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/subscribers/QuotaThresholdMailer.java
@@ -27,6 +27,7 @@ import javax.mail.MessagingException;
 import org.apache.james.core.MailAddress;
 import org.apache.james.core.Username;
 import org.apache.james.eventsourcing.Event;
+import org.apache.james.eventsourcing.EventWithState;
 import org.apache.james.eventsourcing.Subscriber;
 import org.apache.james.filesystem.api.FileSystem;
 import org.apache.james.mailbox.quota.mailing.QuotaMailingListenerConfiguration;
@@ -52,7 +53,8 @@ public class QuotaThresholdMailer implements Subscriber {
     }
 
     @Override
-    public void handle(Event event) {
+    public void handle(EventWithState eventWithState) {
+        Event event = eventWithState.event();
         if (event instanceof QuotaThresholdChangedEvent) {
             handleEvent((QuotaThresholdChangedEvent) event);
         }
diff --git a/server/container/guice/blob/deduplication-gc/src/main/java/org/apache/james/modules/blobstore/validation/RegisterStorageStrategyCommandHandler.java b/server/container/guice/blob/deduplication-gc/src/main/java/org/apache/james/modules/blobstore/validation/RegisterStorageStrategyCommandHandler.java
index ddd69ff9f9..9f8d507609 100644
--- a/server/container/guice/blob/deduplication-gc/src/main/java/org/apache/james/modules/blobstore/validation/RegisterStorageStrategyCommandHandler.java
+++ b/server/container/guice/blob/deduplication-gc/src/main/java/org/apache/james/modules/blobstore/validation/RegisterStorageStrategyCommandHandler.java
@@ -23,7 +23,7 @@ import java.util.List;
 
 import org.apache.james.eventsourcing.AggregateId;
 import org.apache.james.eventsourcing.CommandHandler;
-import org.apache.james.eventsourcing.Event;
+import org.apache.james.eventsourcing.EventWithState;
 import org.apache.james.eventsourcing.eventstore.EventStore;
 import org.reactivestreams.Publisher;
 
@@ -45,7 +45,7 @@ public class RegisterStorageStrategyCommandHandler implements CommandHandler<Reg
     }
 
     @Override
-    public Publisher<List<? extends Event>> handle(RegisterStorageStrategy command) {
+    public Publisher<List<EventWithState>> handle(RegisterStorageStrategy command) {
         return Mono.from(eventStore.getEventsOfAggregate(AGGREGATE_ID))
             .map(history -> StorageStrategyAggregate.load(AGGREGATE_ID, history))
             .map(aggregate -> aggregate.registerStorageStrategy(command));
diff --git a/server/container/guice/blob/deduplication-gc/src/main/java/org/apache/james/modules/blobstore/validation/StorageStrategyAggregate.java b/server/container/guice/blob/deduplication-gc/src/main/java/org/apache/james/modules/blobstore/validation/StorageStrategyAggregate.java
index e949bd46b3..1d1b8f6356 100644
--- a/server/container/guice/blob/deduplication-gc/src/main/java/org/apache/james/modules/blobstore/validation/StorageStrategyAggregate.java
+++ b/server/container/guice/blob/deduplication-gc/src/main/java/org/apache/james/modules/blobstore/validation/StorageStrategyAggregate.java
@@ -27,6 +27,7 @@ import java.util.Optional;
 
 import org.apache.james.eventsourcing.AggregateId;
 import org.apache.james.eventsourcing.Event;
+import org.apache.james.eventsourcing.EventWithState;
 import org.apache.james.eventsourcing.eventstore.History;
 import org.apache.james.server.blob.deduplication.StorageStrategy;
 
@@ -81,7 +82,7 @@ public class StorageStrategyAggregate {
             .forEach(this::apply);
     }
 
-    public List<Event> registerStorageStrategy(RegisterStorageStrategy command) {
+    public List<EventWithState> registerStorageStrategy(RegisterStorageStrategy command) {
         if (state.holds(command.getStorageStrategy())) {
             return ImmutableList.of();
         }
@@ -93,7 +94,8 @@ public class StorageStrategyAggregate {
                     state.getStorageStrategy()));
         }
 
-        return ImmutableList.of(new StorageStrategyChanged(history.getNextEventId(), aggregateId, command.getStorageStrategy()));
+        return ImmutableList.of(EventWithState.noState(
+            new StorageStrategyChanged(history.getNextEventId(), aggregateId, command.getStorageStrategy())));
     }
 
     private void apply(Event event) {
diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/filtering/CassandraFilteringProjection.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/filtering/CassandraFilteringProjection.java
index 486dddf655..0282d7c683 100644
--- a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/filtering/CassandraFilteringProjection.java
+++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/filtering/CassandraFilteringProjection.java
@@ -1,3 +1,22 @@
+/***************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
 package org.apache.james.jmap.cassandra.filtering;
 
 import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker;
@@ -19,14 +38,14 @@ import org.apache.james.core.Username;
 import org.apache.james.eventsourcing.AggregateId;
 import org.apache.james.eventsourcing.Event;
 import org.apache.james.eventsourcing.EventId;
+import org.apache.james.eventsourcing.EventWithState;
 import org.apache.james.eventsourcing.ReactiveSubscriber;
 import org.apache.james.jmap.api.filtering.Rule;
 import org.apache.james.jmap.api.filtering.Rules;
 import org.apache.james.jmap.api.filtering.Version;
 import org.apache.james.jmap.api.filtering.impl.EventSourcingFilteringManagement;
+import org.apache.james.jmap.api.filtering.impl.FilteringAggregate;
 import org.apache.james.jmap.api.filtering.impl.FilteringAggregateId;
-import org.apache.james.jmap.api.filtering.impl.IncrementalRuleChange;
-import org.apache.james.jmap.api.filtering.impl.RuleSetDefined;
 import org.reactivestreams.Publisher;
 
 import com.datastax.oss.driver.api.core.CqlSession;
@@ -39,7 +58,7 @@ import com.google.common.collect.ImmutableList;
 
 import reactor.core.publisher.Mono;
 
-public class CassandraFilteringProjection implements EventSourcingFilteringManagement.ReadProjection {
+public class CassandraFilteringProjection implements EventSourcingFilteringManagement.ReadProjection, ReactiveSubscriber {
     private final CassandraAsyncExecutor executor;
 
     private final PreparedStatement insertStatement;
@@ -89,39 +108,25 @@ public class CassandraFilteringProjection implements EventSourcingFilteringManag
 
     @Override
     public Optional<ReactiveSubscriber> subscriber(Function<Username, Publisher<Rules>> ruleLoader) {
-        return Optional.of(new ReactiveSubscriber() {
-            @Override
-            public Publisher<Void> handleReactive(Event event) {
-                if (event instanceof RuleSetDefined) {
-                    return persist((RuleSetDefined) event);
-                }
-                if (event instanceof IncrementalRuleChange) {
-                    return persist((IncrementalRuleChange) event);
-                }
-                throw new RuntimeException("Unsupported event");
-            }
+        return Optional.of(this);
+    }
 
-            private Mono<Void> persist(RuleSetDefined ruleSetDefined) {
-                return persistRules(ruleSetDefined.getAggregateId(), ruleSetDefined.eventId(), ruleSetDefined.getRules());
-            }
+    @Override
+    public Publisher<Void> handleReactive(EventWithState eventWithState) {
+        Event event = eventWithState.event();
+        FilteringAggregate.FilterState state = (FilteringAggregate.FilterState) eventWithState.state().get();
+        return persistRules(event.getAggregateId(), event.eventId(), state.getRules());
+    }
 
-            private Mono<Void> persistRules(AggregateId aggregateId, EventId eventId, ImmutableList<Rule> rules) {
-                try {
-                    return executor.executeVoid(insertStatement.bind()
-                        .setString(AGGREGATE_ID, aggregateId.asAggregateKey())
-                        .setInt(EVENT_ID, eventId.value())
-                        .setString(RULES, objectMapper.writeValueAsString(RuleDTO.from(rules))));
-                } catch (JsonProcessingException e) {
-                    return Mono.error(e);
-                }
-            }
-
-            private Mono<Void> persist(IncrementalRuleChange incrementalRuleChange) {
-                FilteringAggregateId filteringAggregateId = (FilteringAggregateId) incrementalRuleChange.getAggregateId();
-                return Mono.from(ruleLoader.apply(filteringAggregateId.getUsername()))
-                    .flatMap(rules -> persistRules(filteringAggregateId, incrementalRuleChange.eventId(), ImmutableList.copyOf(rules.getRules())));
-            }
-        });
+    private Mono<Void> persistRules(AggregateId aggregateId, EventId eventId, ImmutableList<Rule> rules) {
+        try {
+            return executor.executeVoid(insertStatement.bind()
+                .setString(AGGREGATE_ID, aggregateId.asAggregateKey())
+                .setInt(EVENT_ID, eventId.value())
+                .setString(RULES, objectMapper.writeValueAsString(RuleDTO.from(rules))));
+        } catch (JsonProcessingException e) {
+            return Mono.error(e);
+        }
     }
 
     private Version parseVersion(Row row) {
diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/DefineRulesCommandHandler.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/DefineRulesCommandHandler.java
index ac64b5fe32..252452f77c 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/DefineRulesCommandHandler.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/DefineRulesCommandHandler.java
@@ -22,14 +22,13 @@ package org.apache.james.jmap.api.filtering.impl;
 import java.util.List;
 
 import org.apache.james.eventsourcing.CommandHandler;
-import org.apache.james.eventsourcing.Event;
+import org.apache.james.eventsourcing.EventWithState;
 import org.apache.james.eventsourcing.eventstore.EventStore;
 import org.reactivestreams.Publisher;
 
 import reactor.core.publisher.Mono;
 
 public class DefineRulesCommandHandler implements CommandHandler<DefineRulesCommand> {
-
     private final EventStore eventStore;
 
     public DefineRulesCommandHandler(EventStore eventStore) {
@@ -42,7 +41,7 @@ public class DefineRulesCommandHandler implements CommandHandler<DefineRulesComm
     }
 
     @Override
-    public Publisher<List<? extends Event>> handle(DefineRulesCommand storeCommand) {
+    public Publisher<List<EventWithState>> handle(DefineRulesCommand storeCommand) {
         FilteringAggregateId aggregateId = new FilteringAggregateId(storeCommand.getUsername());
 
         return Mono.from(eventStore.getEventsOfAggregate(aggregateId))
diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/FilteringAggregate.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/FilteringAggregate.java
index 2828613713..d032f85c70 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/FilteringAggregate.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/FilteringAggregate.java
@@ -24,6 +24,8 @@ import java.util.Optional;
 
 import org.apache.james.eventsourcing.Event;
 import org.apache.james.eventsourcing.EventId;
+import org.apache.james.eventsourcing.EventWithState;
+import org.apache.james.eventsourcing.ImmutableState;
 import org.apache.james.eventsourcing.eventstore.History;
 import org.apache.james.jmap.api.exception.StateMismatchException;
 import org.apache.james.jmap.api.filtering.Rule;
@@ -33,6 +35,8 @@ import org.apache.james.jmap.api.filtering.Version;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 
+import scala.Some;
+
 public class FilteringAggregate {
     private static final boolean ENABLE_INCREMENTS = Boolean.parseBoolean(System.getProperty("james.jmap.filters.eventsource.increments.enabled", "true"));
     private static final boolean ENABLE_SNAPSHOTS = Boolean.parseBoolean(System.getProperty("james.jmap.filters.eventsource.snapshots.enabled", "true"));
@@ -58,6 +62,18 @@ public class FilteringAggregate {
         }
     }
 
+    public static class FilterState implements ImmutableState {
+        private final ImmutableList<Rule> rules;
+
+        public FilterState(ImmutableList<Rule> rules) {
+            this.rules = rules;
+        }
+
+        public ImmutableList<Rule> getRules() {
+            return rules;
+        }
+    }
+
     private final FilteringAggregateId aggregateId;
     private final History history;
     private State state;
@@ -69,33 +85,30 @@ public class FilteringAggregate {
         this.history = history;
     }
 
-    public List<? extends Event> defineRules(DefineRulesCommand storeCommand) {
+    public List<EventWithState> defineRules(DefineRulesCommand storeCommand) {
         Preconditions.checkArgument(shouldNotContainDuplicates(storeCommand.getRules()));
         StateMismatchException.checkState(expectedState(storeCommand.getIfInState()), "Provided state must be as same as the current state");
-        ImmutableList<Event> events = generateEvents(storeCommand);
-        events.forEach(this::apply);
-        return events;
+        Event event = generateEvent(storeCommand);
+        apply(event);
+        return ImmutableList.of(new EventWithState(event, Some.apply(new FilterState(state.rules))));
     }
 
-    private ImmutableList<Event> generateEvents(DefineRulesCommand storeCommand) {
+
+    private Event generateEvent(DefineRulesCommand storeCommand) {
         EventId nextEventId = history.getNextEventId();
         if (ENABLE_INCREMENTS) {
             // SNAPSHOT periodically
             if (ENABLE_SNAPSHOTS && history.getEvents().size() >= 100) {
-                return resetRules(storeCommand, nextEventId);
+                return new RuleSetDefined(aggregateId, nextEventId, ImmutableList.copyOf(storeCommand.getRules()));
             }
-            return IncrementalRuleChange.ofDiff(aggregateId, nextEventId, state.rules, storeCommand.getRules())
-                .map(ImmutableList::<Event>of)
-                .orElseGet(() -> resetRules(storeCommand, nextEventId));
+            return IncrementalRuleChange.ofDiff(aggregateId, history.getNextEventId(), state.rules, storeCommand.getRules())
+                .map(Event.class::cast)
+                .orElseGet(() -> new RuleSetDefined(aggregateId, history.getNextEventId(), ImmutableList.copyOf(storeCommand.getRules())));
         } else {
-            return resetRules(storeCommand, nextEventId);
+            return new RuleSetDefined(aggregateId, history.getNextEventId(), ImmutableList.copyOf(storeCommand.getRules()));
         }
     }
 
-    private ImmutableList<Event> resetRules(DefineRulesCommand storeCommand, EventId nextEventId) {
-        return ImmutableList.of(new RuleSetDefined(aggregateId, nextEventId, ImmutableList.copyOf(storeCommand.getRules())));
-    }
-
     private boolean shouldNotContainDuplicates(List<Rule> rules) {
         long uniqueIdCount = rules.stream()
             .map(Rule::getId)
diff --git a/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/aggregates/DLPDomainConfiguration.java b/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/aggregates/DLPDomainConfiguration.java
index b11d19167a..b8dd786355 100644
--- a/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/aggregates/DLPDomainConfiguration.java
+++ b/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/aggregates/DLPDomainConfiguration.java
@@ -30,6 +30,7 @@ import org.apache.james.dlp.eventsourcing.events.ConfigurationItemsAdded;
 import org.apache.james.dlp.eventsourcing.events.ConfigurationItemsRemoved;
 import org.apache.james.eventsourcing.Event;
 import org.apache.james.eventsourcing.EventId;
+import org.apache.james.eventsourcing.EventWithState;
 import org.apache.james.eventsourcing.eventstore.History;
 
 import com.google.common.collect.ImmutableList;
@@ -80,30 +81,31 @@ public class DLPDomainConfiguration {
         return new DLPRules(ImmutableList.copyOf(state.rules));
     }
 
-    public List<Event> clear() {
+    public List<EventWithState> clear() {
         ImmutableList<DLPConfigurationItem> rules = retrieveRules().getItems();
         if (!rules.isEmpty()) {
-            ImmutableList<Event> events = ImmutableList.of(new ConfigurationItemsRemoved(aggregateId, history.getNextEventId(), rules));
-            events.forEach(this::apply);
-            return events;
+            Event event = new ConfigurationItemsRemoved(aggregateId, history.getNextEventId(), rules);
+            apply(event);
+            return ImmutableList.of(EventWithState.noState(event));
         } else {
             return ImmutableList.of();
         }
     }
 
-    public List<Event> store(DLPRules updatedRules) {
+    public List<EventWithState> store(DLPRules updatedRules) {
         ImmutableSet<DLPConfigurationItem> existingRules = retrieveRules().getItems().stream().collect(ImmutableSet.toImmutableSet());
         ImmutableSet<DLPConfigurationItem> updatedRulesSet = ImmutableSet.copyOf(updatedRules);
 
         Optional<Event> removedRulesEvent = generateRemovedRulesEvent(existingRules, updatedRulesSet);
         Optional<Event> addedRulesEvent = generateAddedRulesEvent(existingRules, updatedRulesSet, computeNextEventId(removedRulesEvent));
 
-        ImmutableList<Event> events = Stream
+        ImmutableList<EventWithState> events = Stream
             .of(removedRulesEvent, addedRulesEvent)
             .flatMap(Optional::stream)
+            .map(EventWithState::noState)
             .collect(ImmutableList.toImmutableList());
 
-        events.forEach(this::apply);
+        events.forEach(e -> apply(e.event()));
         return events;
     }
 
diff --git a/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/ClearCommandHandler.java b/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/ClearCommandHandler.java
index e9ed63e4e0..a617632829 100644
--- a/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/ClearCommandHandler.java
+++ b/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/ClearCommandHandler.java
@@ -24,7 +24,7 @@ import java.util.List;
 import org.apache.james.dlp.eventsourcing.aggregates.DLPAggregateId;
 import org.apache.james.dlp.eventsourcing.aggregates.DLPDomainConfiguration;
 import org.apache.james.eventsourcing.CommandHandler;
-import org.apache.james.eventsourcing.Event;
+import org.apache.james.eventsourcing.EventWithState;
 import org.apache.james.eventsourcing.eventstore.EventStore;
 import org.reactivestreams.Publisher;
 
@@ -44,7 +44,7 @@ public class ClearCommandHandler implements CommandHandler<ClearCommand> {
     }
 
     @Override
-    public Publisher<List<? extends Event>> handle(ClearCommand clearCommand) {
+    public Publisher<List<EventWithState>> handle(ClearCommand clearCommand) {
         DLPAggregateId aggregateId = new DLPAggregateId(clearCommand.getDomain());
 
         return Mono.from(eventStore.getEventsOfAggregate(aggregateId))
diff --git a/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/StoreCommandHandler.java b/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/StoreCommandHandler.java
index 0d628d5bca..337bef2751 100644
--- a/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/StoreCommandHandler.java
+++ b/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/StoreCommandHandler.java
@@ -24,7 +24,7 @@ import java.util.List;
 import org.apache.james.dlp.eventsourcing.aggregates.DLPAggregateId;
 import org.apache.james.dlp.eventsourcing.aggregates.DLPDomainConfiguration;
 import org.apache.james.eventsourcing.CommandHandler;
-import org.apache.james.eventsourcing.Event;
+import org.apache.james.eventsourcing.EventWithState;
 import org.apache.james.eventsourcing.eventstore.EventStore;
 import org.reactivestreams.Publisher;
 
@@ -44,7 +44,7 @@ public class StoreCommandHandler implements CommandHandler<StoreCommand> {
     }
 
     @Override
-    public Publisher<List<? extends Event>> handle(StoreCommand storeCommand) {
+    public Publisher<List<EventWithState>> handle(StoreCommand storeCommand) {
         DLPAggregateId aggregateId = new DLPAggregateId(storeCommand.getDomain());
 
         return Mono.from(eventStore.getEventsOfAggregate(aggregateId))
diff --git a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionTask.java b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionTask.java
index 3d0bc6d2c5..1d0011b8f0 100644
--- a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionTask.java
+++ b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionTask.java
@@ -26,8 +26,10 @@ import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.james.core.Username;
 import org.apache.james.eventsourcing.EventId;
+import org.apache.james.eventsourcing.EventWithState;
 import org.apache.james.jmap.api.filtering.Rules;
 import org.apache.james.jmap.api.filtering.impl.EventSourcingFilteringManagement;
+import org.apache.james.jmap.api.filtering.impl.FilteringAggregate;
 import org.apache.james.jmap.api.filtering.impl.FilteringAggregateId;
 import org.apache.james.jmap.api.filtering.impl.RuleSetDefined;
 import org.apache.james.json.DTOModule;
@@ -43,6 +45,7 @@ import com.google.common.collect.ImmutableList;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import scala.Some;
 
 public class PopulateFilteringProjectionTask implements Task {
     static final TaskType TASK_TYPE = TaskType.of("PopulateFilteringProjectionTask");
@@ -148,8 +151,9 @@ public class PopulateFilteringProjectionTask implements Task {
             .block();
     }
 
-    private RuleSetDefined asEvent(Username user, Rules rules, EventId eventId) {
-        return new RuleSetDefined(new FilteringAggregateId(user), eventId, ImmutableList.copyOf(rules.getRules()));
+    private EventWithState asEvent(Username user, Rules rules, EventId eventId) {
+        return new EventWithState(new RuleSetDefined(new FilteringAggregateId(user), eventId, ImmutableList.copyOf(rules.getRules())),
+            Some.apply(new FilteringAggregate.FilterState(ImmutableList.copyOf(rules.getRules()))));
     }
 
     @Override
diff --git a/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionRequestToTaskTest.java b/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionRequestToTaskTest.java
index 337d63f965..21546d1c5a 100644
--- a/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionRequestToTaskTest.java
+++ b/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionRequestToTaskTest.java
@@ -22,7 +22,6 @@ package org.apache.james.webadmin.data.jmap;
 import static io.restassured.RestAssured.given;
 import static io.restassured.RestAssured.when;
 import static io.restassured.RestAssured.with;
-import static javax.mail.Flags.Flag.DELETED;
 import static org.apache.james.jmap.api.filtering.Rule.Condition.Comparator.CONTAINS;
 import static org.apache.james.jmap.api.filtering.Rule.Condition.Field.SUBJECT;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -35,12 +34,10 @@ import static org.mockito.Mockito.verify;
 
 import java.util.Optional;
 
-import javax.mail.Flags;
-
 import org.apache.james.core.Username;
 import org.apache.james.domainlist.api.DomainList;
-import org.apache.james.eventsourcing.Event;
 import org.apache.james.eventsourcing.EventId;
+import org.apache.james.eventsourcing.EventWithState;
 import org.apache.james.eventsourcing.ReactiveSubscriber;
 import org.apache.james.jmap.api.filtering.Rule;
 import org.apache.james.jmap.api.filtering.Rules;
@@ -48,14 +45,10 @@ import org.apache.james.jmap.api.filtering.Version;
 import org.apache.james.jmap.api.filtering.impl.EventSourcingFilteringManagement;
 import org.apache.james.jmap.api.filtering.impl.FilteringAggregateId;
 import org.apache.james.jmap.api.filtering.impl.RuleSetDefined;
-import org.apache.james.jmap.memory.projections.MemoryEmailQueryView;
 import org.apache.james.json.DTOConverter;
 import org.apache.james.mailbox.MailboxSession;
-import org.apache.james.mailbox.MessageManager;
-import org.apache.james.mailbox.extension.PreDeletionHook;
 import org.apache.james.mailbox.inmemory.InMemoryMailboxManager;
 import org.apache.james.mailbox.inmemory.manager.InMemoryIntegrationResources;
-import org.apache.james.mailbox.model.ComposedMessageId;
 import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.model.MailboxPath;
 import org.apache.james.task.Hostname;
@@ -63,7 +56,6 @@ import org.apache.james.task.MemoryTaskManager;
 import org.apache.james.task.TaskManager;
 import org.apache.james.user.api.UsersRepository;
 import org.apache.james.user.memory.MemoryUsersRepository;
-import org.apache.james.util.streams.Limit;
 import org.apache.james.webadmin.Routes;
 import org.apache.james.webadmin.WebAdminServer;
 import org.apache.james.webadmin.WebAdminUtils;
@@ -238,13 +230,13 @@ class PopulateFilteringProjectionRequestToTaskTest {
             .basePath(TasksRoutes.BASE)
             .get(taskId + "/await");
 
-        ArgumentCaptor<Event> captor = ArgumentCaptor.forClass(Event.class);
+        ArgumentCaptor<EventWithState> captor = ArgumentCaptor.forClass(EventWithState.class);
         verify(subscriber, times(1)).handleReactive(captor.capture());
 
-        assertThat(captor.getValue().eventId()).isEqualTo(EventId.fromSerialized(4));
-        assertThat(captor.getValue().getAggregateId()).isEqualTo(new FilteringAggregateId(BOB));
-        assertThat(captor.getValue()).isInstanceOf(RuleSetDefined.class);
-        RuleSetDefined ruleSetDefined = (RuleSetDefined) captor.getValue();
+        assertThat(captor.getValue().event().eventId()).isEqualTo(EventId.fromSerialized(4));
+        assertThat(captor.getValue().event().getAggregateId()).isEqualTo(new FilteringAggregateId(BOB));
+        assertThat(captor.getValue().event()).isInstanceOf(RuleSetDefined.class);
+        RuleSetDefined ruleSetDefined = (RuleSetDefined) captor.getValue().event();
         assertThat(ruleSetDefined.getRules()).containsOnly(rule);
     }
 }
\ No newline at end of file
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/ConfigurationAggregate.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/ConfigurationAggregate.java
index 50f540718e..51e1cdcda7 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/ConfigurationAggregate.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/ConfigurationAggregate.java
@@ -24,6 +24,7 @@ import java.util.Optional;
 
 import org.apache.james.eventsourcing.AggregateId;
 import org.apache.james.eventsourcing.Event;
+import org.apache.james.eventsourcing.EventWithState;
 import org.apache.james.eventsourcing.eventstore.History;
 
 import com.google.common.base.Preconditions;
@@ -54,7 +55,7 @@ class ConfigurationAggregate {
         return new ConfigurationAggregate(aggregateId, history);
     }
 
-    private static final List<? extends Event> EMPTY_EVENTS = ImmutableList.of();
+    private static final List<EventWithState> EMPTY_EVENTS = ImmutableList.of();
 
     private final AggregateId aggregateId;
     private final History history;
@@ -68,7 +69,7 @@ class ConfigurationAggregate {
         history.getEventsJava().forEach(this::apply);
     }
 
-    List<? extends Event> registerConfiguration(CassandraMailQueueViewConfiguration configuration) {
+    List<EventWithState> registerConfiguration(CassandraMailQueueViewConfiguration configuration) {
         boolean isSame = state.maybeConfiguration.map(configuration::equals).orElse(false);
         if (isSame) {
             return EMPTY_EVENTS;
@@ -76,10 +77,10 @@ class ConfigurationAggregate {
 
         state.maybeConfiguration.ifPresent(oldConfiguration -> oldConfiguration.validateConfigurationChange(configuration));
 
-        return ImmutableList.of(new ConfigurationChanged(
+        return ImmutableList.of(EventWithState.noState(new ConfigurationChanged(
             aggregateId,
             history.getNextEventId(),
-            configuration));
+            configuration)));
     }
 
     Optional<CassandraMailQueueViewConfiguration> getCurrentConfiguration() {
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/RegisterConfigurationCommandHandler.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/RegisterConfigurationCommandHandler.java
index bf54102d8e..415e0b5af4 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/RegisterConfigurationCommandHandler.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/RegisterConfigurationCommandHandler.java
@@ -22,7 +22,7 @@ package org.apache.james.queue.rabbitmq.view.cassandra.configuration;
 import java.util.List;
 
 import org.apache.james.eventsourcing.CommandHandler;
-import org.apache.james.eventsourcing.Event;
+import org.apache.james.eventsourcing.EventWithState;
 import org.apache.james.eventsourcing.eventstore.EventStore;
 import org.reactivestreams.Publisher;
 
@@ -42,7 +42,7 @@ class RegisterConfigurationCommandHandler implements CommandHandler<RegisterConf
     }
 
     @Override
-    public Publisher<List<? extends Event>> handle(RegisterConfigurationCommand command) {
+    public Publisher<List<EventWithState>> handle(RegisterConfigurationCommand command) {
         return Mono.from(eventStore.getEventsOfAggregate(command.getAggregateId()))
             .map(history -> ConfigurationAggregate
                 .load(command.getAggregateId(), history)
diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala
index de682c1018..e1c4ab8309 100644
--- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala
+++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala
@@ -21,11 +21,10 @@ package org.apache.james.task.eventsourcing
 import java.util.List
 
 import org.apache.james.eventsourcing.eventstore.History
-import org.apache.james.eventsourcing.{CommandHandler, Event}
+import org.apache.james.eventsourcing.{CommandHandler, Event, EventWithState}
 import org.apache.james.task.eventsourcing.TaskCommand._
 import org.apache.james.task.{Hostname, TaskId}
 import org.reactivestreams.Publisher
-
 import reactor.core.scala.publisher.SMono
 
 import scala.jdk.CollectionConverters._
@@ -41,8 +40,8 @@ sealed abstract class TaskCommandHandler[T <: TaskCommand] extends CommandHandle
 class CreateCommandHandler(private val loadHistory: TaskAggregateId => SMono[History], hostname: Hostname) extends TaskCommandHandler[Create] {
   override def handledClass: Class[Create] = classOf[Create]
 
-  override def handle(command: Create): Publisher[List[_ <: Event]] = {
-    SMono.fromCallable(() => TaskAggregate.create(TaskAggregateId(command.id), command.task, hostname).asJava)
+  override def handle(command: Create): Publisher[List[EventWithState]] = {
+    SMono.fromCallable(() => TaskAggregate.create(TaskAggregateId(command.id), command.task, hostname).map(EventWithState.noState).asJava)
   }
 }
 
@@ -50,8 +49,8 @@ class StartCommandHandler(private val loadHistory: TaskAggregateId => SMono[Hist
                           private val hostname: Hostname) extends TaskCommandHandler[Start] {
   override def handledClass: Class[Start] = classOf[Start]
 
-  override def handle(command: Start): Publisher[List[_ <: Event]] = {
-    loadAggregate(loadHistory, command.id).map(_.start(hostname).asJava)
+  override def handle(command: Start): Publisher[List[EventWithState]] = {
+    loadAggregate(loadHistory, command.id).map(_.start(hostname).map(EventWithState.noState).asJava)
   }
 }
 
@@ -59,39 +58,39 @@ class RequestCancelCommandHandler(private val loadHistory: TaskAggregateId => SM
                                   private val hostname: Hostname) extends TaskCommandHandler[RequestCancel] {
   override def handledClass: Class[RequestCancel] = classOf[RequestCancel]
 
-  override def handle(command: RequestCancel): Publisher[List[_ <: Event]] = {
-    loadAggregate(loadHistory, command.id).map(_.requestCancel(hostname).asJava)
+  override def handle(command: RequestCancel): Publisher[List[EventWithState]] = {
+    loadAggregate(loadHistory, command.id).map(_.requestCancel(hostname).map(EventWithState.noState).asJava)
   }
 }
 
 class CompleteCommandHandler(private val loadHistory: TaskAggregateId => SMono[History]) extends TaskCommandHandler[Complete] {
   override def handledClass: Class[Complete] = classOf[Complete]
 
-  override def handle(command: Complete): Publisher[List[_ <: Event]] = {
-    loadAggregate(loadHistory, command.id).map(_.complete(command.result, command.additionalInformation).asJava)
+  override def handle(command: Complete): Publisher[List[EventWithState]] = {
+    loadAggregate(loadHistory, command.id).map(_.complete(command.result, command.additionalInformation).map(EventWithState.noState).asJava)
   }
 }
 
 class CancelCommandHandler(private val loadHistory: TaskAggregateId => SMono[History]) extends TaskCommandHandler[Cancel] {
   override def handledClass: Class[Cancel] = classOf[Cancel]
 
-  override def handle(command: Cancel): Publisher[List[_ <: Event]] = {
-    loadAggregate(loadHistory, command.id).map(_.cancel(command.additionalInformation).asJava)
+  override def handle(command: Cancel): Publisher[List[EventWithState]] = {
+    loadAggregate(loadHistory, command.id).map(_.cancel(command.additionalInformation).map(EventWithState.noState).asJava)
   }
 }
 
 class FailCommandHandler(private val loadHistory: TaskAggregateId => SMono[History]) extends TaskCommandHandler[Fail] {
   override def handledClass: Class[Fail] = classOf[Fail]
 
-  override def handle(command: Fail): Publisher[List[_ <: Event]] = {
-    loadAggregate(loadHistory, command.id).map(_.fail(command.additionalInformation, command.errorMessage, command.exception).asJava)
+  override def handle(command: Fail): Publisher[List[EventWithState]] = {
+    loadAggregate(loadHistory, command.id).map(_.fail(command.additionalInformation, command.errorMessage, command.exception).map(EventWithState.noState).asJava)
   }
 }
 
 class UpdateCommandHandler(private val loadHistory: TaskAggregateId => SMono[History]) extends TaskCommandHandler[UpdateAdditionalInformation] {
   override def handledClass: Class[UpdateAdditionalInformation] = classOf[UpdateAdditionalInformation]
 
-  override def handle(command: UpdateAdditionalInformation): Publisher[List[_ <: Event]] = {
-    loadAggregate(loadHistory, command.id).map(_.update(command.additionalInformation).asJava)
+  override def handle(command: UpdateAdditionalInformation): Publisher[List[EventWithState]] = {
+    loadAggregate(loadHistory, command.id).map(_.update(command.additionalInformation).map(EventWithState.noState).asJava)
   }
 }
\ No newline at end of file
diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
index 3ea869dd8b..607e843f16 100644
--- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
+++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
@@ -42,7 +42,7 @@ class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing]
                                                                                  val terminationSubscriber: TerminationSubscriber) extends TaskManager with Closeable with Startable {
   val LOGGER: Logger = LoggerFactory.getLogger(classOf[EventSourcingTaskManager])
 
-  private def workDispatcher: Subscriber = {
+  private def workDispatcher: Subscriber = event => event.event match {
     case Created(aggregateId, _, task, _) =>
       val taskWithId = new TaskWithId(aggregateId.taskId, task)
       workQueue.submit(taskWithId)
diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala
index 22fa7d64cf..5763eab41d 100644
--- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala
+++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala
@@ -29,7 +29,7 @@ import scala.compat.java8.OptionConverters._
 import scala.jdk.CollectionConverters._
 
 trait TaskExecutionDetailsProjection {
-  def asSubscriber(hostname: Hostname): ReactiveSubscriber = {
+  def asSubscriber(hostname: Hostname): ReactiveSubscriber = event => event.event match {
     case created: Created =>
       updateReactive(TaskExecutionDetails.from(created.task, created.aggregateId.taskId, created.hostname))
     case cancelRequested: CancelRequested =>
diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TerminationSubscriber.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TerminationSubscriber.scala
index 1dabb19bb9..2efdd917f7 100644
--- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TerminationSubscriber.scala
+++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TerminationSubscriber.scala
@@ -19,15 +19,15 @@
 
 package org.apache.james.task.eventsourcing
 
-import org.apache.james.eventsourcing.{Event, ReactiveSubscriber}
+import org.apache.james.eventsourcing.{Event, EventWithState, ReactiveSubscriber}
 import org.reactivestreams.Publisher
 import reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST
 import reactor.core.publisher.{Mono, Sinks}
 
 trait TerminationSubscriber extends ReactiveSubscriber {
-  override def handleReactive(event: Event): Publisher[Void] = Mono.fromRunnable(() => handle(event))
+  override def handleReactive(event: EventWithState): Publisher[Void] = Mono.fromRunnable(() => handle(event))
 
-  override def handle(event: Event): Unit = event match {
+  override def handle(event: EventWithState): Unit = event.event match {
     case event: TerminalTaskEvent => addEvent(event)
     case _ =>
   }
diff --git a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java
index 1a2c801292..ec450342a6 100644
--- a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java
+++ b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java
@@ -31,6 +31,7 @@ import java.util.concurrent.Executors;
 
 import org.apache.james.eventsourcing.Event;
 import org.apache.james.eventsourcing.EventId;
+import org.apache.james.eventsourcing.EventWithState;
 import org.apache.james.task.Hostname;
 import org.apache.james.task.Task;
 import org.apache.james.task.TaskId;
@@ -151,6 +152,7 @@ public interface TerminationSubscriberContract {
             .flatMapMany(ignored -> Flux.fromArray(events)
                 .subscribeOn(Schedulers.fromExecutor(EXECUTOR))
                 .delayElements(DELAY_BETWEEN_EVENTS)
+                .map(EventWithState::noState)
                 .doOnNext(subscriber::handle))
             .subscribe();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org