You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/04/21 14:25:55 UTC

[james-project] 01/02: JAMES-3777 Snapshots for Filter event sourcing

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 812875de7124ef524f7c8e19c0a0c40883244e44
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Apr 19 14:15:57 2023 +0700

    JAMES-3777 Snapshots for Filter event sourcing
    
    Snapshots are stored as events. The offset of the event is
    stored in a static column.
    
    Non-invasive and easy to use.
---
 .../org/apache/james/eventsourcing/Event.scala     |  2 +
 .../eventstore/cassandra/CassandraEventStore.scala | 17 +++---
 .../cassandra/CassandraEventStoreModule.scala      |  1 +
 .../cassandra/CassandraEventStoreTable.scala       |  1 +
 .../eventstore/cassandra/EventStoreDao.scala       | 71 ++++++++++++++++++----
 ...CassandraEventStoreExtensionForTestEvents.scala |  2 +-
 .../cassandra/CassandraEventStoreTest.scala        | 37 ++++++++++-
 .../SnapshotEvent.scala}                           | 21 ++++---
 .../{TestEventDTO.scala => SnapshotEventDTO.scala} | 15 +++--
 .../eventstore/cassandra/dto/TestEventDTO.scala    |  7 +--
 .../cassandra/dto/TestEventDTOModules.scala        | 12 ++++
 .../sample-configuration/jvm.properties            |  5 +-
 .../sample-configuration/jvm.properties            |  3 +-
 .../api/filtering/impl/FilteringAggregate.java     | 16 ++++-
 .../jmap/api/filtering/impl/RuleSetDefined.java    |  5 ++
 15 files changed, 165 insertions(+), 50 deletions(-)

diff --git a/event-sourcing/event-sourcing-pojo/src/main/scala/org/apache/james/eventsourcing/Event.scala b/event-sourcing/event-sourcing-pojo/src/main/scala/org/apache/james/eventsourcing/Event.scala
index 5440ab03a2..4fbb7f8215 100644
--- a/event-sourcing/event-sourcing-pojo/src/main/scala/org/apache/james/eventsourcing/Event.scala
+++ b/event-sourcing/event-sourcing-pojo/src/main/scala/org/apache/james/eventsourcing/Event.scala
@@ -33,5 +33,7 @@ trait Event extends Comparable[Event] {
 
   def getAggregateId: AggregateId
 
+  def isASnapshot: Boolean = false
+
   override def compareTo(o: Event): Int = eventId.compareTo(o.eventId)
 }
\ No newline at end of file
diff --git a/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.scala b/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.scala
index bdd3f713f0..f05d54b2d3 100644
--- a/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.scala
+++ b/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.scala
@@ -20,25 +20,23 @@ package org.apache.james.eventsourcing.eventstore.cassandra
 
 import com.google.common.base.Preconditions
 import javax.inject.Inject
-
 import org.apache.james.eventsourcing.eventstore.{EventStore, EventStoreFailedException, History}
-import org.apache.james.eventsourcing.{AggregateId, Event}
+import org.apache.james.eventsourcing.{AggregateId, Event, EventId}
 import org.reactivestreams.Publisher
-
 import reactor.core.scala.publisher.SMono
 
 class CassandraEventStore @Inject() (eventStoreDao: EventStoreDao) extends EventStore {
-  override def appendAll(events: Iterable[Event]): Publisher[Void] = {
+  override def appendAll(events: Iterable[Event]): Publisher[Void] =
     if (events.nonEmpty) {
       doAppendAll(events)
     } else {
       SMono.empty
     }
-  }
 
   private def doAppendAll(events: Iterable[Event]): SMono[Void] = {
     Preconditions.checkArgument(Event.belongsToSameAggregate(events))
-    eventStoreDao.appendAll(events)
+    val snapshotId = events.filter(_.isASnapshot).map(_.eventId).headOption
+    eventStoreDao.appendAll(events, snapshotId)
       .filter(success => success)
       .single()
       .onErrorMap({
@@ -48,9 +46,10 @@ class CassandraEventStore @Inject() (eventStoreDao: EventStoreDao) extends Event
       .`then`(SMono.empty)
   }
 
-  override def getEventsOfAggregate(aggregateId: AggregateId): SMono[History] = {
-    eventStoreDao.getEventsOfAggregate(aggregateId)
-  }
+  override def getEventsOfAggregate(aggregateId: AggregateId): SMono[History] =
+    eventStoreDao.getSnapshot(aggregateId)
+      .flatMap(snapshotId => eventStoreDao.getEventsOfAggregate(aggregateId, snapshotId))
+      .switchIfEmpty(eventStoreDao.getEventsOfAggregate(aggregateId))
 
   override def remove(aggregateId: AggregateId): Publisher[Void] = eventStoreDao.delete(aggregateId)
 }
diff --git a/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreModule.scala b/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreModule.scala
index c92e350ba6..ad2a7eb2b5 100644
--- a/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreModule.scala
+++ b/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreModule.scala
@@ -34,6 +34,7 @@ object CassandraEventStoreModule {
         SchemaBuilder.RowsPerPartition.rows(CassandraConstants.DEFAULT_CACHED_ROW_PER_PARTITION)))
     .statement(statement => _ => statement.withPartitionKey(CassandraEventStoreTable.AGGREGATE_ID, DataTypes.TEXT)
       .withClusteringColumn(CassandraEventStoreTable.EVENT_ID, DataTypes.INT)
+      .withStaticColumn(CassandraEventStoreTable.SNAPSHOT, DataTypes.INT)
       .withColumn(CassandraEventStoreTable.EVENT, DataTypes.TEXT))
     .build
 }
\ No newline at end of file
diff --git a/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreTable.scala b/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreTable.scala
index 57c6754976..f60fce1143 100644
--- a/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreTable.scala
+++ b/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreTable.scala
@@ -25,4 +25,5 @@ object CassandraEventStoreTable {
   val AGGREGATE_ID = CqlIdentifier.fromCql("aggregateId")
   val EVENT = CqlIdentifier.fromCql("event")
   val EVENT_ID = CqlIdentifier.fromCql("eventId")
+  val SNAPSHOT = CqlIdentifier.fromCql("snapshot")
 }
\ No newline at end of file
diff --git a/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.scala b/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.scala
index b83bb626b2..39abbbac15 100644
--- a/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.scala
+++ b/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.scala
@@ -23,21 +23,25 @@ import com.datastax.oss.driver.api.core.CqlSession
 import com.datastax.oss.driver.api.core.`type`.codec.TypeCodecs
 import com.datastax.oss.driver.api.core.cql.{BatchStatementBuilder, BatchType, BoundStatement, PreparedStatement, Row, Statement}
 import com.datastax.oss.driver.api.querybuilder.QueryBuilder
-import com.datastax.oss.driver.api.querybuilder.QueryBuilder.{bindMarker, insertInto}
+import com.datastax.oss.driver.api.querybuilder.QueryBuilder.{bindMarker, insertInto, update}
 import javax.inject.Inject
 import org.apache.james.backends.cassandra.init.configuration.JamesExecutionProfiles
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor
 import org.apache.james.eventsourcing.eventstore.History
-import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreTable.{AGGREGATE_ID, EVENT, EVENTS_TABLE, EVENT_ID}
-import org.apache.james.eventsourcing.{AggregateId, Event}
+import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreTable.{AGGREGATE_ID, EVENT, EVENTS_TABLE, EVENT_ID, SNAPSHOT}
+import org.apache.james.eventsourcing.{AggregateId, Event, EventId}
 import org.apache.james.util.ReactorUtils
+import org.reactivestreams.Publisher
 import reactor.core.scala.publisher.{SFlux, SMono}
 
 class EventStoreDao @Inject() (val session: CqlSession,
                                val jsonEventSerializer: JsonEventSerializer) {
   private val cassandraAsyncExecutor = new CassandraAsyncExecutor(session)
   private val insert = prepareInsert(session)
+  private val insertSnapshot = prepareInsertSnapshot(session)
   private val select = prepareSelect(session)
+  private val selectFrom = prepareSelectFrom(session)
+  private val selectSnapshot = prepareSelectSnapshot(session)
   private val deleteByAggregateId = prepareDelete(session)
   private val executionProfile = JamesExecutionProfiles.getLWTProfile(session)
 
@@ -50,6 +54,13 @@ class EventStoreDao @Inject() (val session: CqlSession,
         .ifNotExists
         .build())
 
+  private def prepareInsertSnapshot(session: CqlSession): PreparedStatement =
+    session.prepare(
+      update(EVENTS_TABLE)
+        .setColumn(SNAPSHOT, bindMarker(SNAPSHOT))
+        .whereColumn(AGGREGATE_ID).isEqualTo(bindMarker(AGGREGATE_ID))
+        .build())
+
   private def prepareSelect(session: CqlSession): PreparedStatement =
     session.prepare(QueryBuilder
       .selectFrom(EVENTS_TABLE)
@@ -57,14 +68,34 @@ class EventStoreDao @Inject() (val session: CqlSession,
       .whereColumn(AGGREGATE_ID).isEqualTo(bindMarker(AGGREGATE_ID))
       .build())
 
+  private def prepareSelectSnapshot(session: CqlSession): PreparedStatement =
+    session.prepare(QueryBuilder
+      .selectFrom(EVENTS_TABLE)
+      .column(SNAPSHOT)
+      .whereColumn(AGGREGATE_ID).isEqualTo(bindMarker(AGGREGATE_ID))
+      .build())
+
+  private def prepareSelectFrom(session: CqlSession): PreparedStatement =
+    session.prepare(QueryBuilder
+      .selectFrom(EVENTS_TABLE)
+      .column(EVENT)
+      .whereColumn(AGGREGATE_ID).isEqualTo(bindMarker(AGGREGATE_ID))
+      .whereColumn(EVENT_ID).isGreaterThanOrEqualTo(bindMarker(EVENT_ID))
+      .build())
+
   private def prepareDelete(session: CqlSession): PreparedStatement =
     session.prepare(QueryBuilder.deleteFrom(EVENTS_TABLE)
       .whereColumn(AGGREGATE_ID).isEqualTo(bindMarker(AGGREGATE_ID))
       .build())
 
-  private[cassandra] def appendAll(events: Iterable[Event]): SMono[Boolean] =
+  private[cassandra] def appendAll(events: Iterable[Event], lastSnapShot: Option[EventId]): SMono[Boolean] =
     SMono(cassandraAsyncExecutor.executeReturnApplied(appendQuery(events))
       .map(_.booleanValue()))
+      .flatMap((success: Boolean) => lastSnapShot
+        .filter(_ => success)
+        .map(id => SMono(cassandraAsyncExecutor.executeVoid(insertSnapshot(events.head.getAggregateId, id))))
+        .getOrElse(SMono.empty)
+        .`then`(SMono.just(success)))
 
   private def appendQuery(events: Iterable[Event]): Statement[_] =
     if (events.size == 1)
@@ -82,18 +113,34 @@ class EventStoreDao @Inject() (val session: CqlSession,
       .setInt(EVENT_ID, event.eventId.serialize)
       .setString(EVENT, jsonEventSerializer.serialize(event))
 
-  private[cassandra] def getEventsOfAggregate(aggregateId: AggregateId): SMono[History] = {
-    val preparedStatement = select.bind()
+  private def insertSnapshot(aggregateId: AggregateId, snapshotId: EventId): BoundStatement =
+    insertSnapshot
+      .bind()
+      .setString(AGGREGATE_ID, aggregateId.asAggregateKey)
+      .setInt(SNAPSHOT,snapshotId.serialize)
+
+  private[cassandra] def getEventsOfAggregate(aggregateId: AggregateId): SMono[History] =
+    asHistory(cassandraAsyncExecutor.executeRows(select.bind()
+      .set(AGGREGATE_ID, aggregateId.asAggregateKey, TypeCodecs.TEXT)
+      .setExecutionProfile(executionProfile)))
+
+  private[cassandra] def getEventsOfAggregate(aggregateId: AggregateId, snapshotId: EventId): SMono[History] =
+    asHistory(cassandraAsyncExecutor.executeRows(selectFrom.bind()
       .set(AGGREGATE_ID, aggregateId.asAggregateKey, TypeCodecs.TEXT)
-      .setExecutionProfile(executionProfile)
-    val rows: SFlux[Row] = SFlux[Row](cassandraAsyncExecutor.executeRows(preparedStatement))
+      .setInt(EVENT_ID, snapshotId.value)
+      .setExecutionProfile(executionProfile)))
 
-    val events: SFlux[Event] = rows.concatMap(toEvent)
-    val listEvents: SMono[List[Event]] = events.collectSeq()
+  private def asHistory(rows: Publisher[Row]): SMono[History] =
+    SFlux[Row](rows)
+      .concatMap(toEvent)
+      .collectSeq()
       .map(_.toList)
+      .map(History.of(_))
 
-    listEvents.map(History.of(_))
-  }
+  private[cassandra] def getSnapshot(aggregateId: AggregateId): SMono[EventId] =
+    SMono(cassandraAsyncExecutor.executeSingleRow(selectSnapshot.bind()
+      .set(AGGREGATE_ID, aggregateId.asAggregateKey, TypeCodecs.TEXT)))
+      .map(row => EventId.fromSerialized(row.get(0, TypeCodecs.INT)))
 
   def delete(aggregateId: AggregateId): SMono[Unit] =
     SMono(cassandraAsyncExecutor.executeVoid(deleteByAggregateId
diff --git a/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreExtensionForTestEvents.scala b/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreExtensionForTestEvents.scala
index 9b1b7a6e32..c7002efd5b 100644
--- a/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreExtensionForTestEvents.scala
+++ b/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreExtensionForTestEvents.scala
@@ -21,4 +21,4 @@ package org.apache.james.eventsourcing.eventstore.cassandra
 import org.apache.james.eventsourcing.eventstore.cassandra.dto.TestEventDTOModules
 import org.junit.jupiter.api.extension.Extension
 
-class CassandraEventStoreExtensionForTestEvents extends CassandraEventStoreExtension(JsonEventSerializer.forModules(TestEventDTOModules.TEST_TYPE).withoutNestedType) with Extension
+class CassandraEventStoreExtensionForTestEvents extends CassandraEventStoreExtension(JsonEventSerializer.forModules(TestEventDTOModules.TEST_TYPE, TestEventDTOModules.SNAPSHOT_TYPE).withoutNestedType) with Extension
diff --git a/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreTest.scala b/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreTest.scala
index 47c266ef4a..5d20fbeebe 100644
--- a/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreTest.scala
+++ b/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreTest.scala
@@ -18,8 +18,41 @@
  ****************************************************************/
 package org.apache.james.eventsourcing.eventstore.cassandra
 
-import org.apache.james.eventsourcing.eventstore.EventStoreContract
+import org.apache.james.eventsourcing.eventstore.cassandra.dto.SnapshotEvent
+import org.apache.james.eventsourcing.{EventId, TestEvent}
+import org.apache.james.eventsourcing.eventstore.{EventStore, EventStoreContract, History}
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.jupiter.api.Test
 import org.junit.jupiter.api.extension.ExtendWith
+import reactor.core.scala.publisher.SMono
 
 @ExtendWith(Array(classOf[CassandraEventStoreExtensionForTestEvents]))
-class CassandraEventStoreTest extends EventStoreContract
\ No newline at end of file
+class CassandraEventStoreTest extends EventStoreContract {
+  @Test
+  def getEventsOfAggregateShouldResumeFromSnapshot(testee: EventStore) : Unit = {
+    val event1 = TestEvent(EventId.first, EventStoreContract.AGGREGATE_1, "first")
+    val event2 = SnapshotEvent(EventId.first.next, EventStoreContract.AGGREGATE_1, "second")
+    val event3 = TestEvent(EventId.first.next.next, EventStoreContract.AGGREGATE_1, "third")
+
+    SMono(testee.append(event1)).block()
+    SMono(testee.append(event2)).block()
+    SMono(testee.append(event3)).block()
+
+    assertThat(SMono(testee.getEventsOfAggregate(EventStoreContract.AGGREGATE_1)).block())
+      .isEqualTo(History.of(event2, event3))
+  }
+
+  @Test
+  def getEventsOfAggregateShouldResumeFromLatestSnapshot(testee: EventStore) : Unit = {
+    val event1 = SnapshotEvent(EventId.first, EventStoreContract.AGGREGATE_1, "first")
+    val event2 = TestEvent(EventId.first.next, EventStoreContract.AGGREGATE_1, "second")
+    val event3 = SnapshotEvent(EventId.first.next.next, EventStoreContract.AGGREGATE_1, "third")
+
+    SMono(testee.append(event1)).block()
+    SMono(testee.append(event2)).block()
+    SMono(testee.append(event3)).block()
+
+    assertThat(SMono(testee.getEventsOfAggregate(EventStoreContract.AGGREGATE_1)).block())
+      .isEqualTo(History.of(event3))
+  }
+}
\ No newline at end of file
diff --git a/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreTest.scala b/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/SnapshotEvent.scala
similarity index 63%
copy from event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreTest.scala
copy to event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/SnapshotEvent.scala
index 47c266ef4a..07f9cfa4bb 100644
--- a/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreTest.scala
+++ b/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/SnapshotEvent.scala
@@ -1,4 +1,4 @@
- /***************************************************************
+/****************************************************************
  * Licensed to the Apache Software Foundation (ASF) under one   *
  * or more contributor license agreements.  See the NOTICE file *
  * distributed with this work for additional information        *
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance   *
  * with the License.  You may obtain a copy of the License at   *
  *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ * http://www.apache.org/licenses/LICENSE-2.0                   *
  *                                                              *
  * Unless required by applicable law or agreed to in writing,   *
  * software distributed under the License is distributed on an  *
@@ -15,11 +15,16 @@
  * KIND, either express or implied.  See the License for the    *
  * specific language governing permissions and limitations      *
  * under the License.                                           *
- ****************************************************************/
-package org.apache.james.eventsourcing.eventstore.cassandra
+ * ***************************************************************/
+package org.apache.james.eventsourcing.eventstore.cassandra.dto
 
-import org.apache.james.eventsourcing.eventstore.EventStoreContract
-import org.junit.jupiter.api.extension.ExtendWith
+import org.apache.james.eventsourcing.{Event, EventId, TestAggregateId}
 
-@ExtendWith(Array(classOf[CassandraEventStoreExtensionForTestEvents]))
-class CassandraEventStoreTest extends EventStoreContract
\ No newline at end of file
+final case class SnapshotEvent(override val eventId: EventId, aggregateId: TestAggregateId, payload: String) extends Event {
+
+  override def getAggregateId = aggregateId
+
+  def getPayload = payload
+
+  override val isASnapshot = true
+}
\ No newline at end of file
diff --git a/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/TestEventDTO.scala b/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/SnapshotEventDTO.scala
similarity index 73%
copy from event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/TestEventDTO.scala
copy to event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/SnapshotEventDTO.scala
index c3fa627064..dedec58367 100644
--- a/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/TestEventDTO.scala
+++ b/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/SnapshotEventDTO.scala
@@ -21,18 +21,17 @@ package org.apache.james.eventsourcing.eventstore.cassandra.dto
 import com.fasterxml.jackson.annotation.{JsonCreator, JsonIgnore, JsonProperty}
 import org.apache.james.eventsourcing.{EventId, TestAggregateId, TestEvent}
 
-final case class TestEventDTO @JsonCreator() ( @JsonProperty("type") `type`: String,
-                                    @JsonProperty("data") data: String,
-                                    @JsonProperty("eventId") eventId: Int,
-                                    @JsonProperty("aggregate") aggregate: Int) extends EventDTO {
-  override def getType: String = {
-  `type`
-}
+final case class SnapshotEventDTO @JsonCreator()(@JsonProperty("type") `type`: String,
+                                                 @JsonProperty("data") data: String,
+                                                 @JsonProperty("eventId") eventId: Int,
+                                                 @JsonProperty("aggregate") aggregate: Int) extends EventDTO {
+  override def getType: String = `type`
+
   def getData: String = data
 
   def getEventId: Long = eventId
 
   def getAggregate: Int = aggregate
 
-  @JsonIgnore def toEvent: TestEvent = new TestEvent(EventId.fromSerialized (eventId), TestAggregateId(aggregate), data)
+  @JsonIgnore def toEvent: SnapshotEvent = SnapshotEvent(EventId.fromSerialized(eventId), TestAggregateId(aggregate), data)
 }
\ No newline at end of file
diff --git a/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/TestEventDTO.scala b/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/TestEventDTO.scala
index c3fa627064..d676aa8295 100644
--- a/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/TestEventDTO.scala
+++ b/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/TestEventDTO.scala
@@ -25,14 +25,13 @@ final case class TestEventDTO @JsonCreator() ( @JsonProperty("type") `type`: Str
                                     @JsonProperty("data") data: String,
                                     @JsonProperty("eventId") eventId: Int,
                                     @JsonProperty("aggregate") aggregate: Int) extends EventDTO {
-  override def getType: String = {
-  `type`
-}
+  override val getType: String = `type`
+
   def getData: String = data
 
   def getEventId: Long = eventId
 
   def getAggregate: Int = aggregate
 
-  @JsonIgnore def toEvent: TestEvent = new TestEvent(EventId.fromSerialized (eventId), TestAggregateId(aggregate), data)
+  @JsonIgnore def toEvent: TestEvent = TestEvent(EventId.fromSerialized(eventId), TestAggregateId(aggregate), data)
 }
\ No newline at end of file
diff --git a/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/TestEventDTOModules.scala b/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/TestEventDTOModules.scala
index 2df32a0b57..90d5f27908 100644
--- a/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/TestEventDTOModules.scala
+++ b/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/TestEventDTOModules.scala
@@ -43,4 +43,16 @@ object TestEventDTOModules {
       event.getAggregateId.getId))
     .typeName("other-type")
     .withFactory(EventDTOModule.apply)
+
+  val SNAPSHOT_TYPE: EventDTOModule[SnapshotEvent, SnapshotEventDTO] = EventDTOModule
+    .forEvent(classOf[SnapshotEvent])
+    .convertToDTO(classOf[SnapshotEventDTO])
+    .toDomainObjectConverter(_.toEvent)
+    .toDTOConverter((event: SnapshotEvent, typeName: String) => SnapshotEventDTO(
+      typeName,
+      event.getPayload,
+      event.eventId.serialize,
+      event.getAggregateId.getId))
+    .typeName("snapshot-type")
+    .withFactory(EventDTOModule.apply)
 }
\ No newline at end of file
diff --git a/server/apps/cassandra-app/sample-configuration/jvm.properties b/server/apps/cassandra-app/sample-configuration/jvm.properties
index 7055fc8731..c4cf7f2ef3 100644
--- a/server/apps/cassandra-app/sample-configuration/jvm.properties
+++ b/server/apps/cassandra-app/sample-configuration/jvm.properties
@@ -56,5 +56,6 @@ jmx.remote.x.mlet.allow.getMBeansFromURL=false
 
 # Disabling JMAP filters event source increments is necessary during rolling adoption of this change.
 # Defaults to true, meaning James will use JMAP filters event source increments, thus transparently and significantly
-# improving JMAP filter storage efficiency.
-# james.jmap.filters.eventsource.increments.enabled=true
\ No newline at end of file
+# improving JMAP filter storage efficiency. Snapshots enable to only build the aggregate from the last few events.
+# james.jmap.filters.eventsource.increments.enabled=true
+# james.jmap.filters.eventsource.snapshots.enabled=true
\ No newline at end of file
diff --git a/server/apps/distributed-app/sample-configuration/jvm.properties b/server/apps/distributed-app/sample-configuration/jvm.properties
index 2d5c47318e..1a462fc95d 100644
--- a/server/apps/distributed-app/sample-configuration/jvm.properties
+++ b/server/apps/distributed-app/sample-configuration/jvm.properties
@@ -56,5 +56,6 @@ jmx.remote.x.mlet.allow.getMBeansFromURL=false
 
 # Disabling JMAP filters event source increments is necessary during rolling adoption of this change.
 # Defaults to true, meaning James will use JMAP filters event source increments, thus transparently and significantly
-# improving JMAP filter storage efficiency.
+# improving JMAP filter storage efficiency. Snapshots enable to only build the aggregate from the last few events.
 # james.jmap.filters.eventsource.increments.enabled=true
+# james.jmap.filters.eventsource.snapshots.enabled=true
diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/FilteringAggregate.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/FilteringAggregate.java
index d5ce85418d..2828613713 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/FilteringAggregate.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/FilteringAggregate.java
@@ -35,6 +35,7 @@ import com.google.common.collect.ImmutableList;
 
 public class FilteringAggregate {
     private static final boolean ENABLE_INCREMENTS = Boolean.parseBoolean(System.getProperty("james.jmap.filters.eventsource.increments.enabled", "true"));
+    private static final boolean ENABLE_SNAPSHOTS = Boolean.parseBoolean(System.getProperty("james.jmap.filters.eventsource.snapshots.enabled", "true"));
 
     public static FilteringAggregate load(FilteringAggregateId aggregateId, History eventsOfAggregate) {
         return new FilteringAggregate(aggregateId, eventsOfAggregate);
@@ -77,15 +78,24 @@ public class FilteringAggregate {
     }
 
     private ImmutableList<Event> generateEvents(DefineRulesCommand storeCommand) {
+        EventId nextEventId = history.getNextEventId();
         if (ENABLE_INCREMENTS) {
-            return IncrementalRuleChange.ofDiff(aggregateId, history.getNextEventId(), state.rules, storeCommand.getRules())
+            // SNAPSHOT periodically
+            if (ENABLE_SNAPSHOTS && history.getEvents().size() >= 100) {
+                return resetRules(storeCommand, nextEventId);
+            }
+            return IncrementalRuleChange.ofDiff(aggregateId, nextEventId, state.rules, storeCommand.getRules())
                 .map(ImmutableList::<Event>of)
-                .orElseGet(() -> ImmutableList.of(new RuleSetDefined(aggregateId, history.getNextEventId(), ImmutableList.copyOf(storeCommand.getRules()))));
+                .orElseGet(() -> resetRules(storeCommand, nextEventId));
         } else {
-            return ImmutableList.of(new RuleSetDefined(aggregateId, history.getNextEventId(), ImmutableList.copyOf(storeCommand.getRules())));
+            return resetRules(storeCommand, nextEventId);
         }
     }
 
+    private ImmutableList<Event> resetRules(DefineRulesCommand storeCommand, EventId nextEventId) {
+        return ImmutableList.of(new RuleSetDefined(aggregateId, nextEventId, ImmutableList.copyOf(storeCommand.getRules())));
+    }
+
     private boolean shouldNotContainDuplicates(List<Rule> rules) {
         long uniqueIdCount = rules.stream()
             .map(Rule::getId)
diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/RuleSetDefined.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/RuleSetDefined.java
index a60736910e..1aa513d0e8 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/RuleSetDefined.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/RuleSetDefined.java
@@ -55,6 +55,11 @@ public class RuleSetDefined implements Event {
         return rules;
     }
 
+    @Override
+    public boolean isASnapshot() {
+        return true;
+    }
+
     @Override
     public final boolean equals(Object o) {
         if (this == o) {


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org