You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/04/21 14:25:55 UTC
[james-project] 01/02: JAMES-3777 Snapshots for Filter event sourcing
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 812875de7124ef524f7c8e19c0a0c40883244e44
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Apr 19 14:15:57 2023 +0700
JAMES-3777 Snapshots for Filter event sourcing
Snapshots are stored as events. The offset of the event is
stored in a static column.
Non-invasive and easy to use.
---
.../org/apache/james/eventsourcing/Event.scala | 2 +
.../eventstore/cassandra/CassandraEventStore.scala | 17 +++---
.../cassandra/CassandraEventStoreModule.scala | 1 +
.../cassandra/CassandraEventStoreTable.scala | 1 +
.../eventstore/cassandra/EventStoreDao.scala | 71 ++++++++++++++++++----
...CassandraEventStoreExtensionForTestEvents.scala | 2 +-
.../cassandra/CassandraEventStoreTest.scala | 37 ++++++++++-
.../SnapshotEvent.scala} | 21 ++++---
.../{TestEventDTO.scala => SnapshotEventDTO.scala} | 15 +++--
.../eventstore/cassandra/dto/TestEventDTO.scala | 7 +--
.../cassandra/dto/TestEventDTOModules.scala | 12 ++++
.../sample-configuration/jvm.properties | 5 +-
.../sample-configuration/jvm.properties | 3 +-
.../api/filtering/impl/FilteringAggregate.java | 16 ++++-
.../jmap/api/filtering/impl/RuleSetDefined.java | 5 ++
15 files changed, 165 insertions(+), 50 deletions(-)
diff --git a/event-sourcing/event-sourcing-pojo/src/main/scala/org/apache/james/eventsourcing/Event.scala b/event-sourcing/event-sourcing-pojo/src/main/scala/org/apache/james/eventsourcing/Event.scala
index 5440ab03a2..4fbb7f8215 100644
--- a/event-sourcing/event-sourcing-pojo/src/main/scala/org/apache/james/eventsourcing/Event.scala
+++ b/event-sourcing/event-sourcing-pojo/src/main/scala/org/apache/james/eventsourcing/Event.scala
@@ -33,5 +33,7 @@ trait Event extends Comparable[Event] {
def getAggregateId: AggregateId
+ def isASnapshot: Boolean = false
+
override def compareTo(o: Event): Int = eventId.compareTo(o.eventId)
}
\ No newline at end of file
diff --git a/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.scala b/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.scala
index bdd3f713f0..f05d54b2d3 100644
--- a/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.scala
+++ b/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStore.scala
@@ -20,25 +20,23 @@ package org.apache.james.eventsourcing.eventstore.cassandra
import com.google.common.base.Preconditions
import javax.inject.Inject
-
import org.apache.james.eventsourcing.eventstore.{EventStore, EventStoreFailedException, History}
-import org.apache.james.eventsourcing.{AggregateId, Event}
+import org.apache.james.eventsourcing.{AggregateId, Event, EventId}
import org.reactivestreams.Publisher
-
import reactor.core.scala.publisher.SMono
class CassandraEventStore @Inject() (eventStoreDao: EventStoreDao) extends EventStore {
- override def appendAll(events: Iterable[Event]): Publisher[Void] = {
+ override def appendAll(events: Iterable[Event]): Publisher[Void] =
if (events.nonEmpty) {
doAppendAll(events)
} else {
SMono.empty
}
- }
private def doAppendAll(events: Iterable[Event]): SMono[Void] = {
Preconditions.checkArgument(Event.belongsToSameAggregate(events))
- eventStoreDao.appendAll(events)
+ val snapshotId = events.filter(_.isASnapshot).map(_.eventId).headOption
+ eventStoreDao.appendAll(events, snapshotId)
.filter(success => success)
.single()
.onErrorMap({
@@ -48,9 +46,10 @@ class CassandraEventStore @Inject() (eventStoreDao: EventStoreDao) extends Event
.`then`(SMono.empty)
}
- override def getEventsOfAggregate(aggregateId: AggregateId): SMono[History] = {
- eventStoreDao.getEventsOfAggregate(aggregateId)
- }
+ override def getEventsOfAggregate(aggregateId: AggregateId): SMono[History] =
+ eventStoreDao.getSnapshot(aggregateId)
+ .flatMap(snapshotId => eventStoreDao.getEventsOfAggregate(aggregateId, snapshotId))
+ .switchIfEmpty(eventStoreDao.getEventsOfAggregate(aggregateId))
override def remove(aggregateId: AggregateId): Publisher[Void] = eventStoreDao.delete(aggregateId)
}
diff --git a/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreModule.scala b/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreModule.scala
index c92e350ba6..ad2a7eb2b5 100644
--- a/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreModule.scala
+++ b/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreModule.scala
@@ -34,6 +34,7 @@ object CassandraEventStoreModule {
SchemaBuilder.RowsPerPartition.rows(CassandraConstants.DEFAULT_CACHED_ROW_PER_PARTITION)))
.statement(statement => _ => statement.withPartitionKey(CassandraEventStoreTable.AGGREGATE_ID, DataTypes.TEXT)
.withClusteringColumn(CassandraEventStoreTable.EVENT_ID, DataTypes.INT)
+ .withStaticColumn(CassandraEventStoreTable.SNAPSHOT, DataTypes.INT)
.withColumn(CassandraEventStoreTable.EVENT, DataTypes.TEXT))
.build
}
\ No newline at end of file
diff --git a/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreTable.scala b/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreTable.scala
index 57c6754976..f60fce1143 100644
--- a/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreTable.scala
+++ b/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreTable.scala
@@ -25,4 +25,5 @@ object CassandraEventStoreTable {
val AGGREGATE_ID = CqlIdentifier.fromCql("aggregateId")
val EVENT = CqlIdentifier.fromCql("event")
val EVENT_ID = CqlIdentifier.fromCql("eventId")
+ val SNAPSHOT = CqlIdentifier.fromCql("snapshot")
}
\ No newline at end of file
diff --git a/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.scala b/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.scala
index b83bb626b2..39abbbac15 100644
--- a/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.scala
+++ b/event-sourcing/event-store-cassandra/src/main/scala/org/apache/james/eventsourcing/eventstore/cassandra/EventStoreDao.scala
@@ -23,21 +23,25 @@ import com.datastax.oss.driver.api.core.CqlSession
import com.datastax.oss.driver.api.core.`type`.codec.TypeCodecs
import com.datastax.oss.driver.api.core.cql.{BatchStatementBuilder, BatchType, BoundStatement, PreparedStatement, Row, Statement}
import com.datastax.oss.driver.api.querybuilder.QueryBuilder
-import com.datastax.oss.driver.api.querybuilder.QueryBuilder.{bindMarker, insertInto}
+import com.datastax.oss.driver.api.querybuilder.QueryBuilder.{bindMarker, insertInto, update}
import javax.inject.Inject
import org.apache.james.backends.cassandra.init.configuration.JamesExecutionProfiles
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor
import org.apache.james.eventsourcing.eventstore.History
-import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreTable.{AGGREGATE_ID, EVENT, EVENTS_TABLE, EVENT_ID}
-import org.apache.james.eventsourcing.{AggregateId, Event}
+import org.apache.james.eventsourcing.eventstore.cassandra.CassandraEventStoreTable.{AGGREGATE_ID, EVENT, EVENTS_TABLE, EVENT_ID, SNAPSHOT}
+import org.apache.james.eventsourcing.{AggregateId, Event, EventId}
import org.apache.james.util.ReactorUtils
+import org.reactivestreams.Publisher
import reactor.core.scala.publisher.{SFlux, SMono}
class EventStoreDao @Inject() (val session: CqlSession,
val jsonEventSerializer: JsonEventSerializer) {
private val cassandraAsyncExecutor = new CassandraAsyncExecutor(session)
private val insert = prepareInsert(session)
+ private val insertSnapshot = prepareInsertSnapshot(session)
private val select = prepareSelect(session)
+ private val selectFrom = prepareSelectFrom(session)
+ private val selectSnapshot = prepareSelectSnapshot(session)
private val deleteByAggregateId = prepareDelete(session)
private val executionProfile = JamesExecutionProfiles.getLWTProfile(session)
@@ -50,6 +54,13 @@ class EventStoreDao @Inject() (val session: CqlSession,
.ifNotExists
.build())
+ private def prepareInsertSnapshot(session: CqlSession): PreparedStatement =
+ session.prepare(
+ update(EVENTS_TABLE)
+ .setColumn(SNAPSHOT, bindMarker(SNAPSHOT))
+ .whereColumn(AGGREGATE_ID).isEqualTo(bindMarker(AGGREGATE_ID))
+ .build())
+
private def prepareSelect(session: CqlSession): PreparedStatement =
session.prepare(QueryBuilder
.selectFrom(EVENTS_TABLE)
@@ -57,14 +68,34 @@ class EventStoreDao @Inject() (val session: CqlSession,
.whereColumn(AGGREGATE_ID).isEqualTo(bindMarker(AGGREGATE_ID))
.build())
+ private def prepareSelectSnapshot(session: CqlSession): PreparedStatement =
+ session.prepare(QueryBuilder
+ .selectFrom(EVENTS_TABLE)
+ .column(SNAPSHOT)
+ .whereColumn(AGGREGATE_ID).isEqualTo(bindMarker(AGGREGATE_ID))
+ .build())
+
+ private def prepareSelectFrom(session: CqlSession): PreparedStatement =
+ session.prepare(QueryBuilder
+ .selectFrom(EVENTS_TABLE)
+ .column(EVENT)
+ .whereColumn(AGGREGATE_ID).isEqualTo(bindMarker(AGGREGATE_ID))
+ .whereColumn(EVENT_ID).isGreaterThanOrEqualTo(bindMarker(EVENT_ID))
+ .build())
+
private def prepareDelete(session: CqlSession): PreparedStatement =
session.prepare(QueryBuilder.deleteFrom(EVENTS_TABLE)
.whereColumn(AGGREGATE_ID).isEqualTo(bindMarker(AGGREGATE_ID))
.build())
- private[cassandra] def appendAll(events: Iterable[Event]): SMono[Boolean] =
+ private[cassandra] def appendAll(events: Iterable[Event], lastSnapShot: Option[EventId]): SMono[Boolean] =
SMono(cassandraAsyncExecutor.executeReturnApplied(appendQuery(events))
.map(_.booleanValue()))
+ .flatMap((success: Boolean) => lastSnapShot
+ .filter(_ => success)
+ .map(id => SMono(cassandraAsyncExecutor.executeVoid(insertSnapshot(events.head.getAggregateId, id))))
+ .getOrElse(SMono.empty)
+ .`then`(SMono.just(success)))
private def appendQuery(events: Iterable[Event]): Statement[_] =
if (events.size == 1)
@@ -82,18 +113,34 @@ class EventStoreDao @Inject() (val session: CqlSession,
.setInt(EVENT_ID, event.eventId.serialize)
.setString(EVENT, jsonEventSerializer.serialize(event))
- private[cassandra] def getEventsOfAggregate(aggregateId: AggregateId): SMono[History] = {
- val preparedStatement = select.bind()
+ private def insertSnapshot(aggregateId: AggregateId, snapshotId: EventId): BoundStatement =
+ insertSnapshot
+ .bind()
+ .setString(AGGREGATE_ID, aggregateId.asAggregateKey)
+ .setInt(SNAPSHOT,snapshotId.serialize)
+
+ private[cassandra] def getEventsOfAggregate(aggregateId: AggregateId): SMono[History] =
+ asHistory(cassandraAsyncExecutor.executeRows(select.bind()
+ .set(AGGREGATE_ID, aggregateId.asAggregateKey, TypeCodecs.TEXT)
+ .setExecutionProfile(executionProfile)))
+
+ private[cassandra] def getEventsOfAggregate(aggregateId: AggregateId, snapshotId: EventId): SMono[History] =
+ asHistory(cassandraAsyncExecutor.executeRows(selectFrom.bind()
.set(AGGREGATE_ID, aggregateId.asAggregateKey, TypeCodecs.TEXT)
- .setExecutionProfile(executionProfile)
- val rows: SFlux[Row] = SFlux[Row](cassandraAsyncExecutor.executeRows(preparedStatement))
+ .setInt(EVENT_ID, snapshotId.value)
+ .setExecutionProfile(executionProfile)))
- val events: SFlux[Event] = rows.concatMap(toEvent)
- val listEvents: SMono[List[Event]] = events.collectSeq()
+ private def asHistory(rows: Publisher[Row]): SMono[History] =
+ SFlux[Row](rows)
+ .concatMap(toEvent)
+ .collectSeq()
.map(_.toList)
+ .map(History.of(_))
- listEvents.map(History.of(_))
- }
+ private[cassandra] def getSnapshot(aggregateId: AggregateId): SMono[EventId] =
+ SMono(cassandraAsyncExecutor.executeSingleRow(selectSnapshot.bind()
+ .set(AGGREGATE_ID, aggregateId.asAggregateKey, TypeCodecs.TEXT)))
+ .map(row => EventId.fromSerialized(row.get(0, TypeCodecs.INT)))
def delete(aggregateId: AggregateId): SMono[Unit] =
SMono(cassandraAsyncExecutor.executeVoid(deleteByAggregateId
diff --git a/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreExtensionForTestEvents.scala b/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreExtensionForTestEvents.scala
index 9b1b7a6e32..c7002efd5b 100644
--- a/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreExtensionForTestEvents.scala
+++ b/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreExtensionForTestEvents.scala
@@ -21,4 +21,4 @@ package org.apache.james.eventsourcing.eventstore.cassandra
import org.apache.james.eventsourcing.eventstore.cassandra.dto.TestEventDTOModules
import org.junit.jupiter.api.extension.Extension
-class CassandraEventStoreExtensionForTestEvents extends CassandraEventStoreExtension(JsonEventSerializer.forModules(TestEventDTOModules.TEST_TYPE).withoutNestedType) with Extension
+class CassandraEventStoreExtensionForTestEvents extends CassandraEventStoreExtension(JsonEventSerializer.forModules(TestEventDTOModules.TEST_TYPE, TestEventDTOModules.SNAPSHOT_TYPE).withoutNestedType) with Extension
diff --git a/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreTest.scala b/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreTest.scala
index 47c266ef4a..5d20fbeebe 100644
--- a/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreTest.scala
+++ b/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreTest.scala
@@ -18,8 +18,41 @@
****************************************************************/
package org.apache.james.eventsourcing.eventstore.cassandra
-import org.apache.james.eventsourcing.eventstore.EventStoreContract
+import org.apache.james.eventsourcing.eventstore.cassandra.dto.SnapshotEvent
+import org.apache.james.eventsourcing.{EventId, TestEvent}
+import org.apache.james.eventsourcing.eventstore.{EventStore, EventStoreContract, History}
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.jupiter.api.Test
import org.junit.jupiter.api.extension.ExtendWith
+import reactor.core.scala.publisher.SMono
@ExtendWith(Array(classOf[CassandraEventStoreExtensionForTestEvents]))
-class CassandraEventStoreTest extends EventStoreContract
\ No newline at end of file
+class CassandraEventStoreTest extends EventStoreContract {
+ @Test
+ def getEventsOfAggregateShouldResumeFromSnapshot(testee: EventStore) : Unit = {
+ val event1 = TestEvent(EventId.first, EventStoreContract.AGGREGATE_1, "first")
+ val event2 = SnapshotEvent(EventId.first.next, EventStoreContract.AGGREGATE_1, "second")
+ val event3 = TestEvent(EventId.first.next.next, EventStoreContract.AGGREGATE_1, "third")
+
+ SMono(testee.append(event1)).block()
+ SMono(testee.append(event2)).block()
+ SMono(testee.append(event3)).block()
+
+ assertThat(SMono(testee.getEventsOfAggregate(EventStoreContract.AGGREGATE_1)).block())
+ .isEqualTo(History.of(event2, event3))
+ }
+
+ @Test
+ def getEventsOfAggregateShouldResumeFromLatestSnapshot(testee: EventStore) : Unit = {
+ val event1 = SnapshotEvent(EventId.first, EventStoreContract.AGGREGATE_1, "first")
+ val event2 = TestEvent(EventId.first.next, EventStoreContract.AGGREGATE_1, "second")
+ val event3 = SnapshotEvent(EventId.first.next.next, EventStoreContract.AGGREGATE_1, "third")
+
+ SMono(testee.append(event1)).block()
+ SMono(testee.append(event2)).block()
+ SMono(testee.append(event3)).block()
+
+ assertThat(SMono(testee.getEventsOfAggregate(EventStoreContract.AGGREGATE_1)).block())
+ .isEqualTo(History.of(event3))
+ }
+}
\ No newline at end of file
diff --git a/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreTest.scala b/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/SnapshotEvent.scala
similarity index 63%
copy from event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreTest.scala
copy to event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/SnapshotEvent.scala
index 47c266ef4a..07f9cfa4bb 100644
--- a/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/CassandraEventStoreTest.scala
+++ b/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/SnapshotEvent.scala
@@ -1,4 +1,4 @@
- /***************************************************************
+/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one *
* or more contributor license agreements. See the NOTICE file *
* distributed with this work for additional information *
@@ -7,7 +7,7 @@
* "License"); you may not use this file except in compliance *
* with the License. You may obtain a copy of the License at *
* *
- * http://www.apache.org/licenses/LICENSE-2.0 *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
* *
* Unless required by applicable law or agreed to in writing, *
* software distributed under the License is distributed on an *
@@ -15,11 +15,16 @@
* KIND, either express or implied. See the License for the *
* specific language governing permissions and limitations *
* under the License. *
- ****************************************************************/
-package org.apache.james.eventsourcing.eventstore.cassandra
+ * ***************************************************************/
+package org.apache.james.eventsourcing.eventstore.cassandra.dto
-import org.apache.james.eventsourcing.eventstore.EventStoreContract
-import org.junit.jupiter.api.extension.ExtendWith
+import org.apache.james.eventsourcing.{Event, EventId, TestAggregateId}
-@ExtendWith(Array(classOf[CassandraEventStoreExtensionForTestEvents]))
-class CassandraEventStoreTest extends EventStoreContract
\ No newline at end of file
+final case class SnapshotEvent(override val eventId: EventId, aggregateId: TestAggregateId, payload: String) extends Event {
+
+ override def getAggregateId = aggregateId
+
+ def getPayload = payload
+
+ override val isASnapshot = true
+}
\ No newline at end of file
diff --git a/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/TestEventDTO.scala b/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/SnapshotEventDTO.scala
similarity index 73%
copy from event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/TestEventDTO.scala
copy to event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/SnapshotEventDTO.scala
index c3fa627064..dedec58367 100644
--- a/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/TestEventDTO.scala
+++ b/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/SnapshotEventDTO.scala
@@ -21,18 +21,17 @@ package org.apache.james.eventsourcing.eventstore.cassandra.dto
import com.fasterxml.jackson.annotation.{JsonCreator, JsonIgnore, JsonProperty}
import org.apache.james.eventsourcing.{EventId, TestAggregateId, TestEvent}
-final case class TestEventDTO @JsonCreator() ( @JsonProperty("type") `type`: String,
- @JsonProperty("data") data: String,
- @JsonProperty("eventId") eventId: Int,
- @JsonProperty("aggregate") aggregate: Int) extends EventDTO {
- override def getType: String = {
- `type`
-}
+final case class SnapshotEventDTO @JsonCreator()(@JsonProperty("type") `type`: String,
+ @JsonProperty("data") data: String,
+ @JsonProperty("eventId") eventId: Int,
+ @JsonProperty("aggregate") aggregate: Int) extends EventDTO {
+ override def getType: String = `type`
+
def getData: String = data
def getEventId: Long = eventId
def getAggregate: Int = aggregate
- @JsonIgnore def toEvent: TestEvent = new TestEvent(EventId.fromSerialized (eventId), TestAggregateId(aggregate), data)
+ @JsonIgnore def toEvent: SnapshotEvent = SnapshotEvent(EventId.fromSerialized(eventId), TestAggregateId(aggregate), data)
}
\ No newline at end of file
diff --git a/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/TestEventDTO.scala b/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/TestEventDTO.scala
index c3fa627064..d676aa8295 100644
--- a/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/TestEventDTO.scala
+++ b/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/TestEventDTO.scala
@@ -25,14 +25,13 @@ final case class TestEventDTO @JsonCreator() ( @JsonProperty("type") `type`: Str
@JsonProperty("data") data: String,
@JsonProperty("eventId") eventId: Int,
@JsonProperty("aggregate") aggregate: Int) extends EventDTO {
- override def getType: String = {
- `type`
-}
+ override val getType: String = `type`
+
def getData: String = data
def getEventId: Long = eventId
def getAggregate: Int = aggregate
- @JsonIgnore def toEvent: TestEvent = new TestEvent(EventId.fromSerialized (eventId), TestAggregateId(aggregate), data)
+ @JsonIgnore def toEvent: TestEvent = TestEvent(EventId.fromSerialized(eventId), TestAggregateId(aggregate), data)
}
\ No newline at end of file
diff --git a/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/TestEventDTOModules.scala b/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/TestEventDTOModules.scala
index 2df32a0b57..90d5f27908 100644
--- a/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/TestEventDTOModules.scala
+++ b/event-sourcing/event-store-cassandra/src/test/scala/org/apache/james/eventsourcing/eventstore/cassandra/dto/TestEventDTOModules.scala
@@ -43,4 +43,16 @@ object TestEventDTOModules {
event.getAggregateId.getId))
.typeName("other-type")
.withFactory(EventDTOModule.apply)
+
+ val SNAPSHOT_TYPE: EventDTOModule[SnapshotEvent, SnapshotEventDTO] = EventDTOModule
+ .forEvent(classOf[SnapshotEvent])
+ .convertToDTO(classOf[SnapshotEventDTO])
+ .toDomainObjectConverter(_.toEvent)
+ .toDTOConverter((event: SnapshotEvent, typeName: String) => SnapshotEventDTO(
+ typeName,
+ event.getPayload,
+ event.eventId.serialize,
+ event.getAggregateId.getId))
+ .typeName("snapshot-type")
+ .withFactory(EventDTOModule.apply)
}
\ No newline at end of file
diff --git a/server/apps/cassandra-app/sample-configuration/jvm.properties b/server/apps/cassandra-app/sample-configuration/jvm.properties
index 7055fc8731..c4cf7f2ef3 100644
--- a/server/apps/cassandra-app/sample-configuration/jvm.properties
+++ b/server/apps/cassandra-app/sample-configuration/jvm.properties
@@ -56,5 +56,6 @@ jmx.remote.x.mlet.allow.getMBeansFromURL=false
# Disabling JMAP filters event source increments is necessary during rolling adoption of this change.
# Defaults to true, meaning James will use JMAP filters event source increments, thus transparently and significantly
-# improving JMAP filter storage efficiency.
-# james.jmap.filters.eventsource.increments.enabled=true
\ No newline at end of file
+# improving JMAP filter storage efficiency. Snapshots enable to only build the aggregate from the last few events.
+# james.jmap.filters.eventsource.increments.enabled=true
+# james.jmap.filters.eventsource.snapshots.enabled=true
\ No newline at end of file
diff --git a/server/apps/distributed-app/sample-configuration/jvm.properties b/server/apps/distributed-app/sample-configuration/jvm.properties
index 2d5c47318e..1a462fc95d 100644
--- a/server/apps/distributed-app/sample-configuration/jvm.properties
+++ b/server/apps/distributed-app/sample-configuration/jvm.properties
@@ -56,5 +56,6 @@ jmx.remote.x.mlet.allow.getMBeansFromURL=false
# Disabling JMAP filters event source increments is necessary during rolling adoption of this change.
# Defaults to true, meaning James will use JMAP filters event source increments, thus transparently and significantly
-# improving JMAP filter storage efficiency.
+# improving JMAP filter storage efficiency. Snapshots enable to only build the aggregate from the last few events.
# james.jmap.filters.eventsource.increments.enabled=true
+# james.jmap.filters.eventsource.snapshots.enabled=true
diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/FilteringAggregate.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/FilteringAggregate.java
index d5ce85418d..2828613713 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/FilteringAggregate.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/FilteringAggregate.java
@@ -35,6 +35,7 @@ import com.google.common.collect.ImmutableList;
public class FilteringAggregate {
private static final boolean ENABLE_INCREMENTS = Boolean.parseBoolean(System.getProperty("james.jmap.filters.eventsource.increments.enabled", "true"));
+ private static final boolean ENABLE_SNAPSHOTS = Boolean.parseBoolean(System.getProperty("james.jmap.filters.eventsource.snapshots.enabled", "true"));
public static FilteringAggregate load(FilteringAggregateId aggregateId, History eventsOfAggregate) {
return new FilteringAggregate(aggregateId, eventsOfAggregate);
@@ -77,15 +78,24 @@ public class FilteringAggregate {
}
private ImmutableList<Event> generateEvents(DefineRulesCommand storeCommand) {
+ EventId nextEventId = history.getNextEventId();
if (ENABLE_INCREMENTS) {
- return IncrementalRuleChange.ofDiff(aggregateId, history.getNextEventId(), state.rules, storeCommand.getRules())
+ // SNAPSHOT periodically
+ if (ENABLE_SNAPSHOTS && history.getEvents().size() >= 100) {
+ return resetRules(storeCommand, nextEventId);
+ }
+ return IncrementalRuleChange.ofDiff(aggregateId, nextEventId, state.rules, storeCommand.getRules())
.map(ImmutableList::<Event>of)
- .orElseGet(() -> ImmutableList.of(new RuleSetDefined(aggregateId, history.getNextEventId(), ImmutableList.copyOf(storeCommand.getRules()))));
+ .orElseGet(() -> resetRules(storeCommand, nextEventId));
} else {
- return ImmutableList.of(new RuleSetDefined(aggregateId, history.getNextEventId(), ImmutableList.copyOf(storeCommand.getRules())));
+ return resetRules(storeCommand, nextEventId);
}
}
+ private ImmutableList<Event> resetRules(DefineRulesCommand storeCommand, EventId nextEventId) {
+ return ImmutableList.of(new RuleSetDefined(aggregateId, nextEventId, ImmutableList.copyOf(storeCommand.getRules())));
+ }
+
private boolean shouldNotContainDuplicates(List<Rule> rules) {
long uniqueIdCount = rules.stream()
.map(Rule::getId)
diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/RuleSetDefined.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/RuleSetDefined.java
index a60736910e..1aa513d0e8 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/RuleSetDefined.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/RuleSetDefined.java
@@ -55,6 +55,11 @@ public class RuleSetDefined implements Event {
return rules;
}
+ @Override
+ public boolean isASnapshot() {
+ return true;
+ }
+
@Override
public final boolean equals(Object o) {
if (this == o) {
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org