You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2023/04/24 08:50:32 UTC
[james-project] branch master updated: JAMES-3777 EventSource subscriber should work with immutable state copy
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 395dad3c32 JAMES-3777 EventSource subscriber should work with immutable state copy
395dad3c32 is described below
commit 395dad3c3286185ca55c9f694448158452bc6061
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Apr 21 18:15:18 2023 +0700
JAMES-3777 EventSource subscriber should work with immutable state copy
This ease work of subscriber and prevent some reads
---
.../apache/james/eventsourcing/CommandHandler.java | 2 +-
.../james/eventsourcing/CommandDispatcher.scala | 8 ++-
.../org/apache/james/eventsourcing/EventBus.scala | 14 ++---
.../apache/james/eventsourcing/Subscriber.scala | 10 +--
.../eventsourcing/DataCollectorSubscriber.scala | 2 +-
.../eventsourcing/EventSourcingSystemTest.scala | 13 ++--
.../org/apache/james/eventsourcing/Event.scala | 10 ++-
.../james/eventsourcing/eventstore/History.scala | 4 +-
.../mail/eventsourcing/acl/AclV2DAOSubscriber.java | 4 +-
.../eventsourcing/acl/DeleteMailboxCommand.java | 4 +-
.../eventsourcing/acl/MailboxACLAggregate.java | 25 +++++---
.../mail/eventsourcing/acl/SetACLCommand.java | 4 +-
.../mail/eventsourcing/acl/UpdateACLCommand.java | 4 +-
.../eventsourcing/acl/UserRightsDAOSubscriber.java | 4 +-
.../mailing/aggregates/UserQuotaThresholds.java | 9 ++-
.../commands/DetectThresholdCrossingHandler.java | 4 +-
.../mailing/subscribers/QuotaThresholdMailer.java | 4 +-
.../RegisterStorageStrategyCommandHandler.java | 4 +-
.../validation/StorageStrategyAggregate.java | 6 +-
.../filtering/CassandraFilteringProjection.java | 73 ++++++++++++----------
.../filtering/impl/DefineRulesCommandHandler.java | 5 +-
.../api/filtering/impl/FilteringAggregate.java | 41 +++++++-----
.../aggregates/DLPDomainConfiguration.java | 16 ++---
.../commands/ClearCommandHandler.java | 4 +-
.../commands/StoreCommandHandler.java | 4 +-
.../data/jmap/PopulateFilteringProjectionTask.java | 8 ++-
...pulateFilteringProjectionRequestToTaskTest.java | 20 ++----
.../configuration/ConfigurationAggregate.java | 9 +--
.../RegisterConfigurationCommandHandler.java | 4 +-
.../james/task/eventsourcing/CommandHandlers.scala | 31 +++++----
.../eventsourcing/EventSourcingTaskManager.scala | 2 +-
.../TaskExecutionDetailsProjection.scala | 2 +-
.../task/eventsourcing/TerminationSubscriber.scala | 6 +-
.../TerminationSubscriberContract.java | 2 +
34 files changed, 204 insertions(+), 158 deletions(-)
diff --git a/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/CommandHandler.java b/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/CommandHandler.java
index dd690832f0..129cefdc58 100644
--- a/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/CommandHandler.java
+++ b/event-sourcing/event-sourcing-core/src/main/java/org/apache/james/eventsourcing/CommandHandler.java
@@ -25,5 +25,5 @@ import org.reactivestreams.Publisher;
public interface CommandHandler<C extends Command> {
Class<C> handledClass();
- Publisher<List<? extends Event>> handle(C command);
+ Publisher<List<EventWithState>> handle(C command);
}
diff --git a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/CommandDispatcher.scala b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/CommandDispatcher.scala
index ce2e40c4d8..14f45df78b 100644
--- a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/CommandDispatcher.scala
+++ b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/CommandDispatcher.scala
@@ -19,8 +19,9 @@
package org.apache.james.eventsourcing
import java.util
-import com.google.common.base.Preconditions
+import com.google.common.base.Preconditions
+import com.google.common.collect.ImmutableList
import javax.inject.Inject
import org.apache.james.eventsourcing.eventstore.EventStoreFailedException
import org.reactivestreams.Publisher
@@ -74,13 +75,14 @@ class CommandDispatcher @Inject()(eventBus: EventBus, handlers: Set[CommandHandl
handleCommand(c) match {
case Some(eventsPublisher) =>
SMono(eventsPublisher)
- .flatMap(events => eventBus.publish(events.asScala).`then`(SMono.just(events)))
+ .flatMap(events => eventBus.publish(events.asScala)
+ .`then`(SMono.just(events.asScala.map(_.event).asJava)))
case _ =>
SMono.error(CommandDispatcher.UnknownCommandException(c))
}
}
- private def handleCommand(c: Command): Option[Publisher[util.List[_ <: Event]]] = {
+ private def handleCommand(c: Command): Option[Publisher[util.List[EventWithState]]] = {
handlersByClass
.get(c.getClass)
.map(commandHandler =>
diff --git a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventBus.scala b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventBus.scala
index ad1e310e9b..1f189cf0ca 100644
--- a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventBus.scala
+++ b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/EventBus.scala
@@ -30,20 +30,20 @@ object EventBus {
class EventBus @Inject() (eventStore: EventStore, subscribers: Set[Subscriber]) {
@throws[EventStoreFailedException]
- def publish(events: Iterable[Event]): SMono[Void] =
- SMono(eventStore.appendAll(events))
+ def publish(events: Iterable[EventWithState]): SMono[Void] =
+ SMono(eventStore.appendAll(events.map(_.event)))
.`then`(runHandlers(events))
- private def runHandlers(events: Iterable[Event]): SMono[Void] =
- SFlux.fromIterable(events.flatMap((event: Event) => subscribers.map(subscriber => (event, subscriber))))
+ private def runHandlers(events: Iterable[EventWithState]): SMono[Void] =
+ SFlux.fromIterable(events.flatMap((commandExecuted: EventWithState) => subscribers.map(subscriber => (commandExecuted, subscriber))))
.concatMap(infos => runHandler(infos._1, infos._2))
.`then`()
.`then`(SMono.empty)
- private def runHandler(event: Event, subscriber: Subscriber): Publisher[Void] =
- SMono(ReactiveSubscriber.asReactiveSubscriber(subscriber).handleReactive(event))
+ private def runHandler(commandExecuted: EventWithState, subscriber: Subscriber): Publisher[Void] =
+ SMono(ReactiveSubscriber.asReactiveSubscriber(subscriber).handleReactive(commandExecuted))
.onErrorResume(e => {
- EventBus.LOGGER.error("Error while calling {} for {}", subscriber, event, e)
+ EventBus.LOGGER.error("Error while calling {} for {}", subscriber, commandExecuted, e)
SMono.empty
})
}
\ No newline at end of file
diff --git a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/Subscriber.scala b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/Subscriber.scala
index 4b823c417f..1acd9000c3 100644
--- a/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/Subscriber.scala
+++ b/event-sourcing/event-sourcing-core/src/main/scala/org/apache/james/eventsourcing/Subscriber.scala
@@ -23,13 +23,13 @@ import org.reactivestreams.Publisher
import reactor.core.scala.publisher.SMono
trait Subscriber {
- def handle(event: Event) : Unit
+ def handle(commandExecuted: EventWithState) : Unit
}
trait ReactiveSubscriber extends Subscriber {
- def handleReactive(event: Event): Publisher[Void]
+ def handleReactive(commandExecuted: EventWithState): Publisher[Void]
- override def handle(event: Event) : Unit = SMono(handleReactive(event)).block()
+ override def handle(commandExecuted: EventWithState) : Unit = SMono(handleReactive(commandExecuted)).block()
}
object ReactiveSubscriber {
@@ -40,9 +40,9 @@ object ReactiveSubscriber {
}
class ReactiveSubscriberWrapper(delegate: Subscriber) extends ReactiveSubscriber {
- override def handle(event: Event) : Unit = delegate.handle(event)
+ override def handle(commandExecuted: EventWithState) : Unit = delegate.handle(commandExecuted)
- def handleReactive(event: Event): Publisher[Void] = SMono.fromCallable(() => handle(event))
+ def handleReactive(commandExecuted: EventWithState): Publisher[Void] = SMono.fromCallable(() => handle(commandExecuted))
.subscribeOn(ReactorUtils.BLOCKING_CALL_WRAPPER)
.`then`()
}
\ No newline at end of file
diff --git a/event-sourcing/event-sourcing-core/src/test/scala/org/apache/james/eventsourcing/DataCollectorSubscriber.scala b/event-sourcing/event-sourcing-core/src/test/scala/org/apache/james/eventsourcing/DataCollectorSubscriber.scala
index 58e3fc987a..2e0912f348 100644
--- a/event-sourcing/event-sourcing-core/src/test/scala/org/apache/james/eventsourcing/DataCollectorSubscriber.scala
+++ b/event-sourcing/event-sourcing-core/src/test/scala/org/apache/james/eventsourcing/DataCollectorSubscriber.scala
@@ -23,7 +23,7 @@ import scala.collection.mutable
class DataCollectorSubscriber() extends Subscriber {
private val data = new mutable.ListBuffer[String]
- override def handle(event: Event): Unit = event match {
+ override def handle(event: EventWithState): Unit = event.event match {
case event: TestEvent => data += event.getData
}
diff --git a/event-sourcing/event-sourcing-core/src/test/scala/org/apache/james/eventsourcing/EventSourcingSystemTest.scala b/event-sourcing/event-sourcing-core/src/test/scala/org/apache/james/eventsourcing/EventSourcingSystemTest.scala
index 55559074c9..e658841c97 100644
--- a/event-sourcing/event-sourcing-core/src/test/scala/org/apache/james/eventsourcing/EventSourcingSystemTest.scala
+++ b/event-sourcing/event-sourcing-core/src/test/scala/org/apache/james/eventsourcing/EventSourcingSystemTest.scala
@@ -73,7 +73,7 @@ trait EventSourcingSystemTest {
val subscriber = new DataCollectorSubscriber
val eventSourcingSystem = new EventSourcingSystem(
Set(simpleDispatcher(eventStore)),
- Set((_: Event) => throw new RuntimeException, subscriber),
+ Set((_: EventWithState) => throw new RuntimeException, subscriber),
eventStore)
Mono.from(eventSourcingSystem.dispatch(new EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_1))).block()
assertThat(subscriber.getData.asJava).containsExactly(EventSourcingSystemTest.PAYLOAD_1)
@@ -87,7 +87,7 @@ trait EventSourcingSystemTest {
val subscriber = new DataCollectorSubscriber
val eventSourcingSystem = new EventSourcingSystem(
Set(simpleDispatcher(eventStore)),
- Set((_: Event) => throw new RuntimeException, subscriber),
+ Set((_: EventWithState) => throw new RuntimeException, subscriber),
eventStore)
assertThatThrownBy(() => Mono.from(eventSourcingSystem.dispatch(new EventSourcingSystemTest.MyCommand(EventSourcingSystemTest.PAYLOAD_1))).block())
.isInstanceOf(classOf[RuntimeException])
@@ -151,22 +151,23 @@ trait EventSourcingSystemTest {
def simpleDispatcher(eventStore: EventStore) = new CommandHandler[EventSourcingSystemTest.MyCommand]() {
override def handledClass: Class[EventSourcingSystemTest.MyCommand] = classOf[EventSourcingSystemTest.MyCommand]
- override def handle(myCommand: EventSourcingSystemTest.MyCommand): Publisher[JavaList[_ <: Event]] = {
+ override def handle(myCommand: EventSourcingSystemTest.MyCommand): Publisher[JavaList[EventWithState]] = {
SMono.apply(eventStore.getEventsOfAggregate(EventSourcingSystemTest.AGGREGATE_ID))
- .map(history => Seq(TestEvent(history.getNextEventId, EventSourcingSystemTest.AGGREGATE_ID, myCommand.getPayload)).asJava)
+ .map(history => Seq(EventWithState.noState(
+ TestEvent(history.getNextEventId, EventSourcingSystemTest.AGGREGATE_ID, myCommand.getPayload))).asJava)
}
}
def wordCuttingDispatcher(eventStore: EventStore) = new CommandHandler[EventSourcingSystemTest.MyCommand]() {
override def handledClass: Class[EventSourcingSystemTest.MyCommand] = classOf[EventSourcingSystemTest.MyCommand]
- override def handle(myCommand: EventSourcingSystemTest.MyCommand): Publisher[JavaList[_ <: Event]] = {
+ override def handle(myCommand: EventSourcingSystemTest.MyCommand): Publisher[JavaList[EventWithState]] = {
SMono.apply(eventStore.getEventsOfAggregate(EventSourcingSystemTest.AGGREGATE_ID))
.map(history => new EventSourcingSystemTest.EventIdIncrementer(history.getNextEventId))
.map(eventIdIncrementer => Splitter.on(" ").splitToList(myCommand.getPayload)
.asScala
.toList
- .map((word: String) => TestEvent(eventIdIncrementer.next, EventSourcingSystemTest.AGGREGATE_ID, word)).asJava)
+ .map((word: String) => EventWithState.noState(TestEvent(eventIdIncrementer.next, EventSourcingSystemTest.AGGREGATE_ID, word))).asJava)
}
}
}
\ No newline at end of file
diff --git a/event-sourcing/event-sourcing-pojo/src/main/scala/org/apache/james/eventsourcing/Event.scala b/event-sourcing/event-sourcing-pojo/src/main/scala/org/apache/james/eventsourcing/Event.scala
index 4fbb7f8215..82e295ec34 100644
--- a/event-sourcing/event-sourcing-pojo/src/main/scala/org/apache/james/eventsourcing/Event.scala
+++ b/event-sourcing/event-sourcing-pojo/src/main/scala/org/apache/james/eventsourcing/Event.scala
@@ -36,4 +36,12 @@ trait Event extends Comparable[Event] {
def isASnapshot: Boolean = false
override def compareTo(o: Event): Int = eventId.compareTo(o.eventId)
-}
\ No newline at end of file
+}
+
+trait ImmutableState
+
+object EventWithState {
+ def noState(event: Event): EventWithState = EventWithState(event, None)
+}
+
+case class EventWithState(event: Event, state: Option[ImmutableState])
\ No newline at end of file
diff --git a/event-sourcing/event-store-api/src/main/scala/org/apache/james/eventsourcing/eventstore/History.scala b/event-sourcing/event-store-api/src/main/scala/org/apache/james/eventsourcing/eventstore/History.scala
index 9eeff9d076..d55c2e2958 100644
--- a/event-sourcing/event-store-api/src/main/scala/org/apache/james/eventsourcing/eventstore/History.scala
+++ b/event-sourcing/event-store-api/src/main/scala/org/apache/james/eventsourcing/eventstore/History.scala
@@ -52,9 +52,9 @@ final case class History private(events: List[Event]) {
def getVersionAsJava: Optional[EventId] = getVersion.toJava
- def getEvents:List[Event] = events
+ def getEvents: List[Event] = events
- def getEventsJava:java.util.List[Event] = events.asJava
+ def getEventsJava: java.util.List[Event] = events.asJava
def getNextEventId: EventId = getVersion
.map(eventId => eventId.next)
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/AclV2DAOSubscriber.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/AclV2DAOSubscriber.java
index 1b3ecada56..d338858138 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/AclV2DAOSubscriber.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/AclV2DAOSubscriber.java
@@ -20,6 +20,7 @@
package org.apache.james.mailbox.cassandra.mail.eventsourcing.acl;
import org.apache.james.eventsourcing.Event;
+import org.apache.james.eventsourcing.EventWithState;
import org.apache.james.eventsourcing.ReactiveSubscriber;
import org.apache.james.mailbox.cassandra.mail.CassandraACLDAOV2;
@@ -34,7 +35,8 @@ public class AclV2DAOSubscriber implements ReactiveSubscriber {
}
@Override
- public Mono<Void> handleReactive(Event event) {
+ public Mono<Void> handleReactive(EventWithState eventWithState) {
+ Event event = eventWithState.event();
if (event instanceof ACLUpdated) {
ACLUpdated aclUpdated = (ACLUpdated) event;
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/DeleteMailboxCommand.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/DeleteMailboxCommand.java
index 55c2af9d0f..feec9c4646 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/DeleteMailboxCommand.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/DeleteMailboxCommand.java
@@ -22,7 +22,7 @@ package org.apache.james.mailbox.cassandra.mail.eventsourcing.acl;
import java.util.List;
import org.apache.james.eventsourcing.Command;
-import org.apache.james.eventsourcing.Event;
+import org.apache.james.eventsourcing.EventWithState;
import org.apache.james.eventsourcing.eventstore.EventStore;
import org.reactivestreams.Publisher;
@@ -42,7 +42,7 @@ public class DeleteMailboxCommand implements Command {
}
@Override
- public Publisher<List<? extends Event>> handle(DeleteMailboxCommand command) {
+ public Publisher<List<EventWithState>> handle(DeleteMailboxCommand command) {
return Mono.from(eventStore.getEventsOfAggregate(command.getId()))
.map(history -> MailboxACLAggregate.load(command.getId(), history))
.map(MailboxACLAggregate::deleteMailbox);
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/MailboxACLAggregate.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/MailboxACLAggregate.java
index 79ca0d5098..f419455e6b 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/MailboxACLAggregate.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/MailboxACLAggregate.java
@@ -23,6 +23,7 @@ import java.util.List;
import java.util.Optional;
import org.apache.james.eventsourcing.Event;
+import org.apache.james.eventsourcing.EventWithState;
import org.apache.james.eventsourcing.eventstore.History;
import org.apache.james.mailbox.acl.ACLDiff;
import org.apache.james.mailbox.exception.UnsupportedRightException;
@@ -65,21 +66,27 @@ public class MailboxACLAggregate {
.forEach(this::apply);
}
- public List<Event> deleteMailbox() {
- return ImmutableList.of(new ACLUpdated(aggregateId, history.getNextEventId(),
- ACLDiff.computeDiff(state.acl.orElse(MailboxACL.EMPTY), MailboxACL.EMPTY)));
+ public List<EventWithState> deleteMailbox() {
+ ACLUpdated event = new ACLUpdated(aggregateId, history.getNextEventId(),
+ ACLDiff.computeDiff(state.acl.orElse(MailboxACL.EMPTY), MailboxACL.EMPTY));
+ apply(event);
+ return ImmutableList.of(EventWithState.noState(event));
}
- public List<Event> set(SetACLCommand setACLCommand) {
- return ImmutableList.of(new ACLUpdated(aggregateId, history.getNextEventId(),
- ACLDiff.computeDiff(state.acl.orElse(MailboxACL.EMPTY), setACLCommand.getAcl())));
+ public List<EventWithState> set(SetACLCommand setACLCommand) {
+ ACLUpdated event = new ACLUpdated(aggregateId, history.getNextEventId(),
+ ACLDiff.computeDiff(state.acl.orElse(MailboxACL.EMPTY), setACLCommand.getAcl()));
+ apply(event);
+ return ImmutableList.of(EventWithState.noState(event));
}
- public List<Event> update(UpdateACLCommand command) throws UnsupportedRightException {
+ public List<EventWithState> update(UpdateACLCommand command) throws UnsupportedRightException {
MailboxACL oldACL = state.acl.orElse(MailboxACL.EMPTY);
- return ImmutableList.of(new ACLUpdated(command.getId(), history.getNextEventId(),
+ ACLUpdated event = new ACLUpdated(command.getId(), history.getNextEventId(),
ACLDiff.computeDiff(oldACL,
- oldACL.apply(command.getAclCommand()))));
+ oldACL.apply(command.getAclCommand())));
+ apply(event);
+ return ImmutableList.of(EventWithState.noState(event));
}
private void apply(Event event) {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/SetACLCommand.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/SetACLCommand.java
index 8b797f5c97..a0f44b3659 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/SetACLCommand.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/SetACLCommand.java
@@ -22,7 +22,7 @@ package org.apache.james.mailbox.cassandra.mail.eventsourcing.acl;
import java.util.List;
import org.apache.james.eventsourcing.Command;
-import org.apache.james.eventsourcing.Event;
+import org.apache.james.eventsourcing.EventWithState;
import org.apache.james.eventsourcing.eventstore.EventStore;
import org.apache.james.mailbox.model.MailboxACL;
import org.reactivestreams.Publisher;
@@ -43,7 +43,7 @@ public class SetACLCommand implements Command {
}
@Override
- public Publisher<List<? extends Event>> handle(SetACLCommand command) {
+ public Publisher<List<EventWithState>> handle(SetACLCommand command) {
return Mono.from(eventStore.getEventsOfAggregate(command.getId()))
.map(history -> MailboxACLAggregate.load(command.getId(), history))
.map(aggregate -> aggregate.set(command));
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/UpdateACLCommand.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/UpdateACLCommand.java
index 607657fb36..b5222b6729 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/UpdateACLCommand.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/UpdateACLCommand.java
@@ -22,7 +22,7 @@ package org.apache.james.mailbox.cassandra.mail.eventsourcing.acl;
import java.util.List;
import org.apache.james.eventsourcing.Command;
-import org.apache.james.eventsourcing.Event;
+import org.apache.james.eventsourcing.EventWithState;
import org.apache.james.eventsourcing.eventstore.EventStore;
import org.apache.james.mailbox.model.MailboxACL;
import org.reactivestreams.Publisher;
@@ -45,7 +45,7 @@ public class UpdateACLCommand implements Command {
}
@Override
- public Publisher<List<? extends Event>> handle(UpdateACLCommand command) {
+ public Publisher<List<EventWithState>> handle(UpdateACLCommand command) {
return Mono.from(eventStore.getEventsOfAggregate(command.getId()))
.map(history -> MailboxACLAggregate.load(command.getId(), history))
.map(Throwing.function(aggregate -> aggregate.update(command)));
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/UserRightsDAOSubscriber.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/UserRightsDAOSubscriber.java
index 32eec22492..1e76ad5e0d 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/UserRightsDAOSubscriber.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/eventsourcing/acl/UserRightsDAOSubscriber.java
@@ -20,6 +20,7 @@
package org.apache.james.mailbox.cassandra.mail.eventsourcing.acl;
import org.apache.james.eventsourcing.Event;
+import org.apache.james.eventsourcing.EventWithState;
import org.apache.james.eventsourcing.ReactiveSubscriber;
import org.apache.james.mailbox.cassandra.mail.CassandraUserMailboxRightsDAO;
import org.reactivestreams.Publisher;
@@ -34,7 +35,8 @@ public class UserRightsDAOSubscriber implements ReactiveSubscriber {
}
@Override
- public Publisher<Void> handleReactive(Event event) {
+ public Publisher<Void> handleReactive(EventWithState eventWithState) {
+ Event event = eventWithState.event();
if (event instanceof ACLUpdated) {
ACLUpdated aclUpdated = (ACLUpdated) event;
return userRightsDAO.update(aclUpdated.mailboxId(), aclUpdated.getAclDiff());
diff --git a/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/aggregates/UserQuotaThresholds.java b/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/aggregates/UserQuotaThresholds.java
index 59d0e0d168..47f826047a 100644
--- a/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/aggregates/UserQuotaThresholds.java
+++ b/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/aggregates/UserQuotaThresholds.java
@@ -32,6 +32,7 @@ import org.apache.james.core.quota.QuotaCountUsage;
import org.apache.james.core.quota.QuotaSizeLimit;
import org.apache.james.core.quota.QuotaSizeUsage;
import org.apache.james.eventsourcing.AggregateId;
+import org.apache.james.eventsourcing.EventWithState;
import org.apache.james.eventsourcing.eventstore.History;
import org.apache.james.mailbox.model.Quota;
import org.apache.james.mailbox.quota.mailing.QuotaMailingListenerConfiguration;
@@ -131,8 +132,8 @@ public class UserQuotaThresholds {
.collect(Collectors.toList());
}
- public List<QuotaThresholdChangedEvent> detectThresholdCrossing(QuotaMailingListenerConfiguration configuration,
- DetectThresholdCrossing command) {
+ public List<EventWithState> detectThresholdCrossing(QuotaMailingListenerConfiguration configuration,
+ DetectThresholdCrossing command) {
List<QuotaThresholdChangedEvent> events = generateEvents(
configuration.getThresholds(),
@@ -141,7 +142,9 @@ public class UserQuotaThresholds {
command.getSizeQuota(),
command.getInstant());
events.forEach(this::apply);
- return events;
+ return events.stream()
+ .map(EventWithState::noState)
+ .collect(ImmutableList.toImmutableList());
}
private List<QuotaThresholdChangedEvent> generateEvents(QuotaThresholds configuration, Duration gracePeriod, Quota<QuotaCountLimit, QuotaCountUsage> countQuota, Quota<QuotaSizeLimit, QuotaSizeUsage> sizeQuota, Instant now) {
diff --git a/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/commands/DetectThresholdCrossingHandler.java b/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/commands/DetectThresholdCrossingHandler.java
index 551644c31f..cd66c3208e 100644
--- a/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/commands/DetectThresholdCrossingHandler.java
+++ b/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/commands/DetectThresholdCrossingHandler.java
@@ -22,7 +22,7 @@ package org.apache.james.mailbox.quota.mailing.commands;
import java.util.List;
import org.apache.james.eventsourcing.CommandHandler;
-import org.apache.james.eventsourcing.Event;
+import org.apache.james.eventsourcing.EventWithState;
import org.apache.james.eventsourcing.eventstore.EventStore;
import org.apache.james.mailbox.quota.mailing.QuotaMailingListenerConfiguration;
import org.apache.james.mailbox.quota.mailing.aggregates.UserQuotaThresholds;
@@ -43,7 +43,7 @@ public class DetectThresholdCrossingHandler implements CommandHandler<DetectThre
}
@Override
- public Publisher<List<? extends Event>> handle(DetectThresholdCrossing command) {
+ public Publisher<List<EventWithState>> handle(DetectThresholdCrossing command) {
return loadAggregate(command)
.map(aggregate -> aggregate.detectThresholdCrossing(quotaMailingListenerConfiguration, command));
}
diff --git a/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/subscribers/QuotaThresholdMailer.java b/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/subscribers/QuotaThresholdMailer.java
index ddd06ffa16..117e130e6c 100644
--- a/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/subscribers/QuotaThresholdMailer.java
+++ b/mailbox/plugin/quota-mailing/src/main/java/org/apache/james/mailbox/quota/mailing/subscribers/QuotaThresholdMailer.java
@@ -27,6 +27,7 @@ import javax.mail.MessagingException;
import org.apache.james.core.MailAddress;
import org.apache.james.core.Username;
import org.apache.james.eventsourcing.Event;
+import org.apache.james.eventsourcing.EventWithState;
import org.apache.james.eventsourcing.Subscriber;
import org.apache.james.filesystem.api.FileSystem;
import org.apache.james.mailbox.quota.mailing.QuotaMailingListenerConfiguration;
@@ -52,7 +53,8 @@ public class QuotaThresholdMailer implements Subscriber {
}
@Override
- public void handle(Event event) {
+ public void handle(EventWithState eventWithState) {
+ Event event = eventWithState.event();
if (event instanceof QuotaThresholdChangedEvent) {
handleEvent((QuotaThresholdChangedEvent) event);
}
diff --git a/server/container/guice/blob/deduplication-gc/src/main/java/org/apache/james/modules/blobstore/validation/RegisterStorageStrategyCommandHandler.java b/server/container/guice/blob/deduplication-gc/src/main/java/org/apache/james/modules/blobstore/validation/RegisterStorageStrategyCommandHandler.java
index ddd69ff9f9..9f8d507609 100644
--- a/server/container/guice/blob/deduplication-gc/src/main/java/org/apache/james/modules/blobstore/validation/RegisterStorageStrategyCommandHandler.java
+++ b/server/container/guice/blob/deduplication-gc/src/main/java/org/apache/james/modules/blobstore/validation/RegisterStorageStrategyCommandHandler.java
@@ -23,7 +23,7 @@ import java.util.List;
import org.apache.james.eventsourcing.AggregateId;
import org.apache.james.eventsourcing.CommandHandler;
-import org.apache.james.eventsourcing.Event;
+import org.apache.james.eventsourcing.EventWithState;
import org.apache.james.eventsourcing.eventstore.EventStore;
import org.reactivestreams.Publisher;
@@ -45,7 +45,7 @@ public class RegisterStorageStrategyCommandHandler implements CommandHandler<Reg
}
@Override
- public Publisher<List<? extends Event>> handle(RegisterStorageStrategy command) {
+ public Publisher<List<EventWithState>> handle(RegisterStorageStrategy command) {
return Mono.from(eventStore.getEventsOfAggregate(AGGREGATE_ID))
.map(history -> StorageStrategyAggregate.load(AGGREGATE_ID, history))
.map(aggregate -> aggregate.registerStorageStrategy(command));
diff --git a/server/container/guice/blob/deduplication-gc/src/main/java/org/apache/james/modules/blobstore/validation/StorageStrategyAggregate.java b/server/container/guice/blob/deduplication-gc/src/main/java/org/apache/james/modules/blobstore/validation/StorageStrategyAggregate.java
index e949bd46b3..1d1b8f6356 100644
--- a/server/container/guice/blob/deduplication-gc/src/main/java/org/apache/james/modules/blobstore/validation/StorageStrategyAggregate.java
+++ b/server/container/guice/blob/deduplication-gc/src/main/java/org/apache/james/modules/blobstore/validation/StorageStrategyAggregate.java
@@ -27,6 +27,7 @@ import java.util.Optional;
import org.apache.james.eventsourcing.AggregateId;
import org.apache.james.eventsourcing.Event;
+import org.apache.james.eventsourcing.EventWithState;
import org.apache.james.eventsourcing.eventstore.History;
import org.apache.james.server.blob.deduplication.StorageStrategy;
@@ -81,7 +82,7 @@ public class StorageStrategyAggregate {
.forEach(this::apply);
}
- public List<Event> registerStorageStrategy(RegisterStorageStrategy command) {
+ public List<EventWithState> registerStorageStrategy(RegisterStorageStrategy command) {
if (state.holds(command.getStorageStrategy())) {
return ImmutableList.of();
}
@@ -93,7 +94,8 @@ public class StorageStrategyAggregate {
state.getStorageStrategy()));
}
- return ImmutableList.of(new StorageStrategyChanged(history.getNextEventId(), aggregateId, command.getStorageStrategy()));
+ return ImmutableList.of(EventWithState.noState(
+ new StorageStrategyChanged(history.getNextEventId(), aggregateId, command.getStorageStrategy())));
}
private void apply(Event event) {
diff --git a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/filtering/CassandraFilteringProjection.java b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/filtering/CassandraFilteringProjection.java
index 486dddf655..0282d7c683 100644
--- a/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/filtering/CassandraFilteringProjection.java
+++ b/server/data/data-jmap-cassandra/src/main/java/org/apache/james/jmap/cassandra/filtering/CassandraFilteringProjection.java
@@ -1,3 +1,22 @@
+/***************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
package org.apache.james.jmap.cassandra.filtering;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.bindMarker;
@@ -19,14 +38,14 @@ import org.apache.james.core.Username;
import org.apache.james.eventsourcing.AggregateId;
import org.apache.james.eventsourcing.Event;
import org.apache.james.eventsourcing.EventId;
+import org.apache.james.eventsourcing.EventWithState;
import org.apache.james.eventsourcing.ReactiveSubscriber;
import org.apache.james.jmap.api.filtering.Rule;
import org.apache.james.jmap.api.filtering.Rules;
import org.apache.james.jmap.api.filtering.Version;
import org.apache.james.jmap.api.filtering.impl.EventSourcingFilteringManagement;
+import org.apache.james.jmap.api.filtering.impl.FilteringAggregate;
import org.apache.james.jmap.api.filtering.impl.FilteringAggregateId;
-import org.apache.james.jmap.api.filtering.impl.IncrementalRuleChange;
-import org.apache.james.jmap.api.filtering.impl.RuleSetDefined;
import org.reactivestreams.Publisher;
import com.datastax.oss.driver.api.core.CqlSession;
@@ -39,7 +58,7 @@ import com.google.common.collect.ImmutableList;
import reactor.core.publisher.Mono;
-public class CassandraFilteringProjection implements EventSourcingFilteringManagement.ReadProjection {
+public class CassandraFilteringProjection implements EventSourcingFilteringManagement.ReadProjection, ReactiveSubscriber {
private final CassandraAsyncExecutor executor;
private final PreparedStatement insertStatement;
@@ -89,39 +108,25 @@ public class CassandraFilteringProjection implements EventSourcingFilteringManag
@Override
public Optional<ReactiveSubscriber> subscriber(Function<Username, Publisher<Rules>> ruleLoader) {
- return Optional.of(new ReactiveSubscriber() {
- @Override
- public Publisher<Void> handleReactive(Event event) {
- if (event instanceof RuleSetDefined) {
- return persist((RuleSetDefined) event);
- }
- if (event instanceof IncrementalRuleChange) {
- return persist((IncrementalRuleChange) event);
- }
- throw new RuntimeException("Unsupported event");
- }
+ return Optional.of(this);
+ }
- private Mono<Void> persist(RuleSetDefined ruleSetDefined) {
- return persistRules(ruleSetDefined.getAggregateId(), ruleSetDefined.eventId(), ruleSetDefined.getRules());
- }
+ @Override
+ public Publisher<Void> handleReactive(EventWithState eventWithState) {
+ Event event = eventWithState.event();
+ FilteringAggregate.FilterState state = (FilteringAggregate.FilterState) eventWithState.state().get();
+ return persistRules(event.getAggregateId(), event.eventId(), state.getRules());
+ }
- private Mono<Void> persistRules(AggregateId aggregateId, EventId eventId, ImmutableList<Rule> rules) {
- try {
- return executor.executeVoid(insertStatement.bind()
- .setString(AGGREGATE_ID, aggregateId.asAggregateKey())
- .setInt(EVENT_ID, eventId.value())
- .setString(RULES, objectMapper.writeValueAsString(RuleDTO.from(rules))));
- } catch (JsonProcessingException e) {
- return Mono.error(e);
- }
- }
-
- private Mono<Void> persist(IncrementalRuleChange incrementalRuleChange) {
- FilteringAggregateId filteringAggregateId = (FilteringAggregateId) incrementalRuleChange.getAggregateId();
- return Mono.from(ruleLoader.apply(filteringAggregateId.getUsername()))
- .flatMap(rules -> persistRules(filteringAggregateId, incrementalRuleChange.eventId(), ImmutableList.copyOf(rules.getRules())));
- }
- });
+ private Mono<Void> persistRules(AggregateId aggregateId, EventId eventId, ImmutableList<Rule> rules) {
+ try {
+ return executor.executeVoid(insertStatement.bind()
+ .setString(AGGREGATE_ID, aggregateId.asAggregateKey())
+ .setInt(EVENT_ID, eventId.value())
+ .setString(RULES, objectMapper.writeValueAsString(RuleDTO.from(rules))));
+ } catch (JsonProcessingException e) {
+ return Mono.error(e);
+ }
}
private Version parseVersion(Row row) {
diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/DefineRulesCommandHandler.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/DefineRulesCommandHandler.java
index ac64b5fe32..252452f77c 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/DefineRulesCommandHandler.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/DefineRulesCommandHandler.java
@@ -22,14 +22,13 @@ package org.apache.james.jmap.api.filtering.impl;
import java.util.List;
import org.apache.james.eventsourcing.CommandHandler;
-import org.apache.james.eventsourcing.Event;
+import org.apache.james.eventsourcing.EventWithState;
import org.apache.james.eventsourcing.eventstore.EventStore;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;
public class DefineRulesCommandHandler implements CommandHandler<DefineRulesCommand> {
-
private final EventStore eventStore;
public DefineRulesCommandHandler(EventStore eventStore) {
@@ -42,7 +41,7 @@ public class DefineRulesCommandHandler implements CommandHandler<DefineRulesComm
}
@Override
- public Publisher<List<? extends Event>> handle(DefineRulesCommand storeCommand) {
+ public Publisher<List<EventWithState>> handle(DefineRulesCommand storeCommand) {
FilteringAggregateId aggregateId = new FilteringAggregateId(storeCommand.getUsername());
return Mono.from(eventStore.getEventsOfAggregate(aggregateId))
diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/FilteringAggregate.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/FilteringAggregate.java
index 2828613713..d032f85c70 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/FilteringAggregate.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/filtering/impl/FilteringAggregate.java
@@ -24,6 +24,8 @@ import java.util.Optional;
import org.apache.james.eventsourcing.Event;
import org.apache.james.eventsourcing.EventId;
+import org.apache.james.eventsourcing.EventWithState;
+import org.apache.james.eventsourcing.ImmutableState;
import org.apache.james.eventsourcing.eventstore.History;
import org.apache.james.jmap.api.exception.StateMismatchException;
import org.apache.james.jmap.api.filtering.Rule;
@@ -33,6 +35,8 @@ import org.apache.james.jmap.api.filtering.Version;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
+import scala.Some;
+
public class FilteringAggregate {
private static final boolean ENABLE_INCREMENTS = Boolean.parseBoolean(System.getProperty("james.jmap.filters.eventsource.increments.enabled", "true"));
private static final boolean ENABLE_SNAPSHOTS = Boolean.parseBoolean(System.getProperty("james.jmap.filters.eventsource.snapshots.enabled", "true"));
@@ -58,6 +62,18 @@ public class FilteringAggregate {
}
}
+ public static class FilterState implements ImmutableState {
+ private final ImmutableList<Rule> rules;
+
+ public FilterState(ImmutableList<Rule> rules) {
+ this.rules = rules;
+ }
+
+ public ImmutableList<Rule> getRules() {
+ return rules;
+ }
+ }
+
private final FilteringAggregateId aggregateId;
private final History history;
private State state;
@@ -69,33 +85,30 @@ public class FilteringAggregate {
this.history = history;
}
- public List<? extends Event> defineRules(DefineRulesCommand storeCommand) {
+ public List<EventWithState> defineRules(DefineRulesCommand storeCommand) {
Preconditions.checkArgument(shouldNotContainDuplicates(storeCommand.getRules()));
StateMismatchException.checkState(expectedState(storeCommand.getIfInState()), "Provided state must be as same as the current state");
- ImmutableList<Event> events = generateEvents(storeCommand);
- events.forEach(this::apply);
- return events;
+ Event event = generateEvent(storeCommand);
+ apply(event);
+ return ImmutableList.of(new EventWithState(event, Some.apply(new FilterState(state.rules))));
}
- private ImmutableList<Event> generateEvents(DefineRulesCommand storeCommand) {
+
+ private Event generateEvent(DefineRulesCommand storeCommand) {
EventId nextEventId = history.getNextEventId();
if (ENABLE_INCREMENTS) {
// SNAPSHOT periodically
if (ENABLE_SNAPSHOTS && history.getEvents().size() >= 100) {
- return resetRules(storeCommand, nextEventId);
+ return new RuleSetDefined(aggregateId, nextEventId, ImmutableList.copyOf(storeCommand.getRules()));
}
- return IncrementalRuleChange.ofDiff(aggregateId, nextEventId, state.rules, storeCommand.getRules())
- .map(ImmutableList::<Event>of)
- .orElseGet(() -> resetRules(storeCommand, nextEventId));
+ return IncrementalRuleChange.ofDiff(aggregateId, history.getNextEventId(), state.rules, storeCommand.getRules())
+ .map(Event.class::cast)
+ .orElseGet(() -> new RuleSetDefined(aggregateId, history.getNextEventId(), ImmutableList.copyOf(storeCommand.getRules())));
} else {
- return resetRules(storeCommand, nextEventId);
+ return new RuleSetDefined(aggregateId, history.getNextEventId(), ImmutableList.copyOf(storeCommand.getRules()));
}
}
- private ImmutableList<Event> resetRules(DefineRulesCommand storeCommand, EventId nextEventId) {
- return ImmutableList.of(new RuleSetDefined(aggregateId, nextEventId, ImmutableList.copyOf(storeCommand.getRules())));
- }
-
private boolean shouldNotContainDuplicates(List<Rule> rules) {
long uniqueIdCount = rules.stream()
.map(Rule::getId)
diff --git a/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/aggregates/DLPDomainConfiguration.java b/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/aggregates/DLPDomainConfiguration.java
index b11d19167a..b8dd786355 100644
--- a/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/aggregates/DLPDomainConfiguration.java
+++ b/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/aggregates/DLPDomainConfiguration.java
@@ -30,6 +30,7 @@ import org.apache.james.dlp.eventsourcing.events.ConfigurationItemsAdded;
import org.apache.james.dlp.eventsourcing.events.ConfigurationItemsRemoved;
import org.apache.james.eventsourcing.Event;
import org.apache.james.eventsourcing.EventId;
+import org.apache.james.eventsourcing.EventWithState;
import org.apache.james.eventsourcing.eventstore.History;
import com.google.common.collect.ImmutableList;
@@ -80,30 +81,31 @@ public class DLPDomainConfiguration {
return new DLPRules(ImmutableList.copyOf(state.rules));
}
- public List<Event> clear() {
+ public List<EventWithState> clear() {
ImmutableList<DLPConfigurationItem> rules = retrieveRules().getItems();
if (!rules.isEmpty()) {
- ImmutableList<Event> events = ImmutableList.of(new ConfigurationItemsRemoved(aggregateId, history.getNextEventId(), rules));
- events.forEach(this::apply);
- return events;
+ Event event = new ConfigurationItemsRemoved(aggregateId, history.getNextEventId(), rules);
+ apply(event);
+ return ImmutableList.of(EventWithState.noState(event));
} else {
return ImmutableList.of();
}
}
- public List<Event> store(DLPRules updatedRules) {
+ public List<EventWithState> store(DLPRules updatedRules) {
ImmutableSet<DLPConfigurationItem> existingRules = retrieveRules().getItems().stream().collect(ImmutableSet.toImmutableSet());
ImmutableSet<DLPConfigurationItem> updatedRulesSet = ImmutableSet.copyOf(updatedRules);
Optional<Event> removedRulesEvent = generateRemovedRulesEvent(existingRules, updatedRulesSet);
Optional<Event> addedRulesEvent = generateAddedRulesEvent(existingRules, updatedRulesSet, computeNextEventId(removedRulesEvent));
- ImmutableList<Event> events = Stream
+ ImmutableList<EventWithState> events = Stream
.of(removedRulesEvent, addedRulesEvent)
.flatMap(Optional::stream)
+ .map(EventWithState::noState)
.collect(ImmutableList.toImmutableList());
- events.forEach(this::apply);
+ events.forEach(e -> apply(e.event()));
return events;
}
diff --git a/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/ClearCommandHandler.java b/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/ClearCommandHandler.java
index e9ed63e4e0..a617632829 100644
--- a/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/ClearCommandHandler.java
+++ b/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/ClearCommandHandler.java
@@ -24,7 +24,7 @@ import java.util.List;
import org.apache.james.dlp.eventsourcing.aggregates.DLPAggregateId;
import org.apache.james.dlp.eventsourcing.aggregates.DLPDomainConfiguration;
import org.apache.james.eventsourcing.CommandHandler;
-import org.apache.james.eventsourcing.Event;
+import org.apache.james.eventsourcing.EventWithState;
import org.apache.james.eventsourcing.eventstore.EventStore;
import org.reactivestreams.Publisher;
@@ -44,7 +44,7 @@ public class ClearCommandHandler implements CommandHandler<ClearCommand> {
}
@Override
- public Publisher<List<? extends Event>> handle(ClearCommand clearCommand) {
+ public Publisher<List<EventWithState>> handle(ClearCommand clearCommand) {
DLPAggregateId aggregateId = new DLPAggregateId(clearCommand.getDomain());
return Mono.from(eventStore.getEventsOfAggregate(aggregateId))
diff --git a/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/StoreCommandHandler.java b/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/StoreCommandHandler.java
index 0d628d5bca..337bef2751 100644
--- a/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/StoreCommandHandler.java
+++ b/server/data/data-library/src/main/java/org/apache/james/dlp/eventsourcing/commands/StoreCommandHandler.java
@@ -24,7 +24,7 @@ import java.util.List;
import org.apache.james.dlp.eventsourcing.aggregates.DLPAggregateId;
import org.apache.james.dlp.eventsourcing.aggregates.DLPDomainConfiguration;
import org.apache.james.eventsourcing.CommandHandler;
-import org.apache.james.eventsourcing.Event;
+import org.apache.james.eventsourcing.EventWithState;
import org.apache.james.eventsourcing.eventstore.EventStore;
import org.reactivestreams.Publisher;
@@ -44,7 +44,7 @@ public class StoreCommandHandler implements CommandHandler<StoreCommand> {
}
@Override
- public Publisher<List<? extends Event>> handle(StoreCommand storeCommand) {
+ public Publisher<List<EventWithState>> handle(StoreCommand storeCommand) {
DLPAggregateId aggregateId = new DLPAggregateId(storeCommand.getDomain());
return Mono.from(eventStore.getEventsOfAggregate(aggregateId))
diff --git a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionTask.java b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionTask.java
index 3d0bc6d2c5..1d0011b8f0 100644
--- a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionTask.java
+++ b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionTask.java
@@ -26,8 +26,10 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.james.core.Username;
import org.apache.james.eventsourcing.EventId;
+import org.apache.james.eventsourcing.EventWithState;
import org.apache.james.jmap.api.filtering.Rules;
import org.apache.james.jmap.api.filtering.impl.EventSourcingFilteringManagement;
+import org.apache.james.jmap.api.filtering.impl.FilteringAggregate;
import org.apache.james.jmap.api.filtering.impl.FilteringAggregateId;
import org.apache.james.jmap.api.filtering.impl.RuleSetDefined;
import org.apache.james.json.DTOModule;
@@ -43,6 +45,7 @@ import com.google.common.collect.ImmutableList;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import scala.Some;
public class PopulateFilteringProjectionTask implements Task {
static final TaskType TASK_TYPE = TaskType.of("PopulateFilteringProjectionTask");
@@ -148,8 +151,9 @@ public class PopulateFilteringProjectionTask implements Task {
.block();
}
- private RuleSetDefined asEvent(Username user, Rules rules, EventId eventId) {
- return new RuleSetDefined(new FilteringAggregateId(user), eventId, ImmutableList.copyOf(rules.getRules()));
+ private EventWithState asEvent(Username user, Rules rules, EventId eventId) {
+ return new EventWithState(new RuleSetDefined(new FilteringAggregateId(user), eventId, ImmutableList.copyOf(rules.getRules())),
+ Some.apply(new FilteringAggregate.FilterState(ImmutableList.copyOf(rules.getRules()))));
}
@Override
diff --git a/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionRequestToTaskTest.java b/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionRequestToTaskTest.java
index 337d63f965..21546d1c5a 100644
--- a/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionRequestToTaskTest.java
+++ b/server/protocols/webadmin/webadmin-jmap/src/test/java/org/apache/james/webadmin/data/jmap/PopulateFilteringProjectionRequestToTaskTest.java
@@ -22,7 +22,6 @@ package org.apache.james.webadmin.data.jmap;
import static io.restassured.RestAssured.given;
import static io.restassured.RestAssured.when;
import static io.restassured.RestAssured.with;
-import static javax.mail.Flags.Flag.DELETED;
import static org.apache.james.jmap.api.filtering.Rule.Condition.Comparator.CONTAINS;
import static org.apache.james.jmap.api.filtering.Rule.Condition.Field.SUBJECT;
import static org.assertj.core.api.Assertions.assertThat;
@@ -35,12 +34,10 @@ import static org.mockito.Mockito.verify;
import java.util.Optional;
-import javax.mail.Flags;
-
import org.apache.james.core.Username;
import org.apache.james.domainlist.api.DomainList;
-import org.apache.james.eventsourcing.Event;
import org.apache.james.eventsourcing.EventId;
+import org.apache.james.eventsourcing.EventWithState;
import org.apache.james.eventsourcing.ReactiveSubscriber;
import org.apache.james.jmap.api.filtering.Rule;
import org.apache.james.jmap.api.filtering.Rules;
@@ -48,14 +45,10 @@ import org.apache.james.jmap.api.filtering.Version;
import org.apache.james.jmap.api.filtering.impl.EventSourcingFilteringManagement;
import org.apache.james.jmap.api.filtering.impl.FilteringAggregateId;
import org.apache.james.jmap.api.filtering.impl.RuleSetDefined;
-import org.apache.james.jmap.memory.projections.MemoryEmailQueryView;
import org.apache.james.json.DTOConverter;
import org.apache.james.mailbox.MailboxSession;
-import org.apache.james.mailbox.MessageManager;
-import org.apache.james.mailbox.extension.PreDeletionHook;
import org.apache.james.mailbox.inmemory.InMemoryMailboxManager;
import org.apache.james.mailbox.inmemory.manager.InMemoryIntegrationResources;
-import org.apache.james.mailbox.model.ComposedMessageId;
import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.mailbox.model.MailboxPath;
import org.apache.james.task.Hostname;
@@ -63,7 +56,6 @@ import org.apache.james.task.MemoryTaskManager;
import org.apache.james.task.TaskManager;
import org.apache.james.user.api.UsersRepository;
import org.apache.james.user.memory.MemoryUsersRepository;
-import org.apache.james.util.streams.Limit;
import org.apache.james.webadmin.Routes;
import org.apache.james.webadmin.WebAdminServer;
import org.apache.james.webadmin.WebAdminUtils;
@@ -238,13 +230,13 @@ class PopulateFilteringProjectionRequestToTaskTest {
.basePath(TasksRoutes.BASE)
.get(taskId + "/await");
- ArgumentCaptor<Event> captor = ArgumentCaptor.forClass(Event.class);
+ ArgumentCaptor<EventWithState> captor = ArgumentCaptor.forClass(EventWithState.class);
verify(subscriber, times(1)).handleReactive(captor.capture());
- assertThat(captor.getValue().eventId()).isEqualTo(EventId.fromSerialized(4));
- assertThat(captor.getValue().getAggregateId()).isEqualTo(new FilteringAggregateId(BOB));
- assertThat(captor.getValue()).isInstanceOf(RuleSetDefined.class);
- RuleSetDefined ruleSetDefined = (RuleSetDefined) captor.getValue();
+ assertThat(captor.getValue().event().eventId()).isEqualTo(EventId.fromSerialized(4));
+ assertThat(captor.getValue().event().getAggregateId()).isEqualTo(new FilteringAggregateId(BOB));
+ assertThat(captor.getValue().event()).isInstanceOf(RuleSetDefined.class);
+ RuleSetDefined ruleSetDefined = (RuleSetDefined) captor.getValue().event();
assertThat(ruleSetDefined.getRules()).containsOnly(rule);
}
}
\ No newline at end of file
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/ConfigurationAggregate.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/ConfigurationAggregate.java
index 50f540718e..51e1cdcda7 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/ConfigurationAggregate.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/ConfigurationAggregate.java
@@ -24,6 +24,7 @@ import java.util.Optional;
import org.apache.james.eventsourcing.AggregateId;
import org.apache.james.eventsourcing.Event;
+import org.apache.james.eventsourcing.EventWithState;
import org.apache.james.eventsourcing.eventstore.History;
import com.google.common.base.Preconditions;
@@ -54,7 +55,7 @@ class ConfigurationAggregate {
return new ConfigurationAggregate(aggregateId, history);
}
- private static final List<? extends Event> EMPTY_EVENTS = ImmutableList.of();
+ private static final List<EventWithState> EMPTY_EVENTS = ImmutableList.of();
private final AggregateId aggregateId;
private final History history;
@@ -68,7 +69,7 @@ class ConfigurationAggregate {
history.getEventsJava().forEach(this::apply);
}
- List<? extends Event> registerConfiguration(CassandraMailQueueViewConfiguration configuration) {
+ List<EventWithState> registerConfiguration(CassandraMailQueueViewConfiguration configuration) {
boolean isSame = state.maybeConfiguration.map(configuration::equals).orElse(false);
if (isSame) {
return EMPTY_EVENTS;
@@ -76,10 +77,10 @@ class ConfigurationAggregate {
state.maybeConfiguration.ifPresent(oldConfiguration -> oldConfiguration.validateConfigurationChange(configuration));
- return ImmutableList.of(new ConfigurationChanged(
+ return ImmutableList.of(EventWithState.noState(new ConfigurationChanged(
aggregateId,
history.getNextEventId(),
- configuration));
+ configuration)));
}
Optional<CassandraMailQueueViewConfiguration> getCurrentConfiguration() {
diff --git a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/RegisterConfigurationCommandHandler.java b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/RegisterConfigurationCommandHandler.java
index bf54102d8e..415e0b5af4 100644
--- a/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/RegisterConfigurationCommandHandler.java
+++ b/server/queue/queue-rabbitmq/src/main/java/org/apache/james/queue/rabbitmq/view/cassandra/configuration/RegisterConfigurationCommandHandler.java
@@ -22,7 +22,7 @@ package org.apache.james.queue.rabbitmq.view.cassandra.configuration;
import java.util.List;
import org.apache.james.eventsourcing.CommandHandler;
-import org.apache.james.eventsourcing.Event;
+import org.apache.james.eventsourcing.EventWithState;
import org.apache.james.eventsourcing.eventstore.EventStore;
import org.reactivestreams.Publisher;
@@ -42,7 +42,7 @@ class RegisterConfigurationCommandHandler implements CommandHandler<RegisterConf
}
@Override
- public Publisher<List<? extends Event>> handle(RegisterConfigurationCommand command) {
+ public Publisher<List<EventWithState>> handle(RegisterConfigurationCommand command) {
return Mono.from(eventStore.getEventsOfAggregate(command.getAggregateId()))
.map(history -> ConfigurationAggregate
.load(command.getAggregateId(), history)
diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala
index de682c1018..e1c4ab8309 100644
--- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala
+++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala
@@ -21,11 +21,10 @@ package org.apache.james.task.eventsourcing
import java.util.List
import org.apache.james.eventsourcing.eventstore.History
-import org.apache.james.eventsourcing.{CommandHandler, Event}
+import org.apache.james.eventsourcing.{CommandHandler, Event, EventWithState}
import org.apache.james.task.eventsourcing.TaskCommand._
import org.apache.james.task.{Hostname, TaskId}
import org.reactivestreams.Publisher
-
import reactor.core.scala.publisher.SMono
import scala.jdk.CollectionConverters._
@@ -41,8 +40,8 @@ sealed abstract class TaskCommandHandler[T <: TaskCommand] extends CommandHandle
class CreateCommandHandler(private val loadHistory: TaskAggregateId => SMono[History], hostname: Hostname) extends TaskCommandHandler[Create] {
override def handledClass: Class[Create] = classOf[Create]
- override def handle(command: Create): Publisher[List[_ <: Event]] = {
- SMono.fromCallable(() => TaskAggregate.create(TaskAggregateId(command.id), command.task, hostname).asJava)
+ override def handle(command: Create): Publisher[List[EventWithState]] = {
+ SMono.fromCallable(() => TaskAggregate.create(TaskAggregateId(command.id), command.task, hostname).map(EventWithState.noState).asJava)
}
}
@@ -50,8 +49,8 @@ class StartCommandHandler(private val loadHistory: TaskAggregateId => SMono[Hist
private val hostname: Hostname) extends TaskCommandHandler[Start] {
override def handledClass: Class[Start] = classOf[Start]
- override def handle(command: Start): Publisher[List[_ <: Event]] = {
- loadAggregate(loadHistory, command.id).map(_.start(hostname).asJava)
+ override def handle(command: Start): Publisher[List[EventWithState]] = {
+ loadAggregate(loadHistory, command.id).map(_.start(hostname).map(EventWithState.noState).asJava)
}
}
@@ -59,39 +58,39 @@ class RequestCancelCommandHandler(private val loadHistory: TaskAggregateId => SM
private val hostname: Hostname) extends TaskCommandHandler[RequestCancel] {
override def handledClass: Class[RequestCancel] = classOf[RequestCancel]
- override def handle(command: RequestCancel): Publisher[List[_ <: Event]] = {
- loadAggregate(loadHistory, command.id).map(_.requestCancel(hostname).asJava)
+ override def handle(command: RequestCancel): Publisher[List[EventWithState]] = {
+ loadAggregate(loadHistory, command.id).map(_.requestCancel(hostname).map(EventWithState.noState).asJava)
}
}
class CompleteCommandHandler(private val loadHistory: TaskAggregateId => SMono[History]) extends TaskCommandHandler[Complete] {
override def handledClass: Class[Complete] = classOf[Complete]
- override def handle(command: Complete): Publisher[List[_ <: Event]] = {
- loadAggregate(loadHistory, command.id).map(_.complete(command.result, command.additionalInformation).asJava)
+ override def handle(command: Complete): Publisher[List[EventWithState]] = {
+ loadAggregate(loadHistory, command.id).map(_.complete(command.result, command.additionalInformation).map(EventWithState.noState).asJava)
}
}
class CancelCommandHandler(private val loadHistory: TaskAggregateId => SMono[History]) extends TaskCommandHandler[Cancel] {
override def handledClass: Class[Cancel] = classOf[Cancel]
- override def handle(command: Cancel): Publisher[List[_ <: Event]] = {
- loadAggregate(loadHistory, command.id).map(_.cancel(command.additionalInformation).asJava)
+ override def handle(command: Cancel): Publisher[List[EventWithState]] = {
+ loadAggregate(loadHistory, command.id).map(_.cancel(command.additionalInformation).map(EventWithState.noState).asJava)
}
}
class FailCommandHandler(private val loadHistory: TaskAggregateId => SMono[History]) extends TaskCommandHandler[Fail] {
override def handledClass: Class[Fail] = classOf[Fail]
- override def handle(command: Fail): Publisher[List[_ <: Event]] = {
- loadAggregate(loadHistory, command.id).map(_.fail(command.additionalInformation, command.errorMessage, command.exception).asJava)
+ override def handle(command: Fail): Publisher[List[EventWithState]] = {
+ loadAggregate(loadHistory, command.id).map(_.fail(command.additionalInformation, command.errorMessage, command.exception).map(EventWithState.noState).asJava)
}
}
class UpdateCommandHandler(private val loadHistory: TaskAggregateId => SMono[History]) extends TaskCommandHandler[UpdateAdditionalInformation] {
override def handledClass: Class[UpdateAdditionalInformation] = classOf[UpdateAdditionalInformation]
- override def handle(command: UpdateAdditionalInformation): Publisher[List[_ <: Event]] = {
- loadAggregate(loadHistory, command.id).map(_.update(command.additionalInformation).asJava)
+ override def handle(command: UpdateAdditionalInformation): Publisher[List[EventWithState]] = {
+ loadAggregate(loadHistory, command.id).map(_.update(command.additionalInformation).map(EventWithState.noState).asJava)
}
}
\ No newline at end of file
diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
index 3ea869dd8b..607e843f16 100644
--- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
+++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
@@ -42,7 +42,7 @@ class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing]
val terminationSubscriber: TerminationSubscriber) extends TaskManager with Closeable with Startable {
val LOGGER: Logger = LoggerFactory.getLogger(classOf[EventSourcingTaskManager])
- private def workDispatcher: Subscriber = {
+ private def workDispatcher: Subscriber = event => event.event match {
case Created(aggregateId, _, task, _) =>
val taskWithId = new TaskWithId(aggregateId.taskId, task)
workQueue.submit(taskWithId)
diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala
index 22fa7d64cf..5763eab41d 100644
--- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala
+++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala
@@ -29,7 +29,7 @@ import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._
trait TaskExecutionDetailsProjection {
- def asSubscriber(hostname: Hostname): ReactiveSubscriber = {
+ def asSubscriber(hostname: Hostname): ReactiveSubscriber = event => event.event match {
case created: Created =>
updateReactive(TaskExecutionDetails.from(created.task, created.aggregateId.taskId, created.hostname))
case cancelRequested: CancelRequested =>
diff --git a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TerminationSubscriber.scala b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TerminationSubscriber.scala
index 1dabb19bb9..2efdd917f7 100644
--- a/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TerminationSubscriber.scala
+++ b/server/task/task-memory/src/main/scala/org/apache/james/task/eventsourcing/TerminationSubscriber.scala
@@ -19,15 +19,15 @@
package org.apache.james.task.eventsourcing
-import org.apache.james.eventsourcing.{Event, ReactiveSubscriber}
+import org.apache.james.eventsourcing.{Event, EventWithState, ReactiveSubscriber}
import org.reactivestreams.Publisher
import reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST
import reactor.core.publisher.{Mono, Sinks}
trait TerminationSubscriber extends ReactiveSubscriber {
- override def handleReactive(event: Event): Publisher[Void] = Mono.fromRunnable(() => handle(event))
+ override def handleReactive(event: EventWithState): Publisher[Void] = Mono.fromRunnable(() => handle(event))
- override def handle(event: Event): Unit = event match {
+ override def handle(event: EventWithState): Unit = event.event match {
case event: TerminalTaskEvent => addEvent(event)
case _ =>
}
diff --git a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java
index 1a2c801292..ec450342a6 100644
--- a/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java
+++ b/server/task/task-memory/src/test/java/org/apache/james/task/eventsourcing/TerminationSubscriberContract.java
@@ -31,6 +31,7 @@ import java.util.concurrent.Executors;
import org.apache.james.eventsourcing.Event;
import org.apache.james.eventsourcing.EventId;
+import org.apache.james.eventsourcing.EventWithState;
import org.apache.james.task.Hostname;
import org.apache.james.task.Task;
import org.apache.james.task.TaskId;
@@ -151,6 +152,7 @@ public interface TerminationSubscriberContract {
.flatMapMany(ignored -> Flux.fromArray(events)
.subscribeOn(Schedulers.fromExecutor(EXECUTOR))
.delayElements(DELAY_BETWEEN_EVENTS)
+ .map(EventWithState::noState)
.doOnNext(subscriber::handle))
.subscribe();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org