You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/02/09 04:29:45 UTC
[james-project] 17/33: JAMES-3491 Have an EventBus for JMAP
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit b349944f46bb691208982838ec71fad79ef20a58
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Feb 3 16:06:24 2021 +0700
JAMES-3491 Have an EventBus for JMAP
---
.../org/apache/james/CassandraJamesServerMain.java | 2 +
.../james/CassandraRabbitMQJamesServerMain.java | 2 +
.../james/modules/event/JMAPEventBusModule.java | 92 ++++++++++++++++++++++
.../org/apache/james/MemoryJamesServerMain.java | 2 +
.../modules/protocols/JmapEventBusModule.java | 14 ++++
.../james/jmap/change/JmapEventSerializer.scala | 17 ++--
.../james/jmap/change/MailboxChangeListener.scala | 28 +++++--
.../jmap/change/MailboxChangeListenerTest.scala | 16 +++-
.../java/org/apache/james/jmap/InjectionKeys.java | 24 ++++++
9 files changed, 179 insertions(+), 18 deletions(-)
diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/CassandraJamesServerMain.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/CassandraJamesServerMain.java
index 40bfa50..908f486 100644
--- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/CassandraJamesServerMain.java
+++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/CassandraJamesServerMain.java
@@ -46,6 +46,7 @@ import org.apache.james.modules.mailrepository.CassandraMailRepositoryModule;
import org.apache.james.modules.metrics.CassandraMetricsModule;
import org.apache.james.modules.protocols.IMAPServerModule;
import org.apache.james.modules.protocols.JMAPServerModule;
+import org.apache.james.modules.protocols.JmapEventBusModule;
import org.apache.james.modules.protocols.LMTPServerModule;
import org.apache.james.modules.protocols.ManageSieveServerModule;
import org.apache.james.modules.protocols.POP3ServerModule;
@@ -111,6 +112,7 @@ public class CassandraJamesServerMain implements JamesServerMain {
new ProtocolHandlerModule(),
new SMTPServerModule(),
new JMAPServerModule(),
+ new JmapEventBusModule(),
WEBADMIN);
public static final Module PLUGINS = Modules.combine(
diff --git a/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java b/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java
index c0c23d9..69c1ee5 100644
--- a/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java
+++ b/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java
@@ -26,6 +26,7 @@ import org.apache.james.modules.DistributedTaskSerializationModule;
import org.apache.james.modules.blobstore.BlobStoreCacheModulesChooser;
import org.apache.james.modules.blobstore.BlobStoreConfiguration;
import org.apache.james.modules.blobstore.BlobStoreModulesChooser;
+import org.apache.james.modules.event.JMAPEventBusModule;
import org.apache.james.modules.event.RabbitMQEventBusModule;
import org.apache.james.modules.queue.rabbitmq.RabbitMQModule;
import org.apache.james.modules.server.JMXServerModule;
@@ -40,6 +41,7 @@ public class CassandraRabbitMQJamesServerMain implements JamesServerMain {
.override(Modules.combine(REQUIRE_TASK_MANAGER_MODULE, new DistributedTaskManagerModule()))
.with(new RabbitMQModule(),
new RabbitMailQueueRoutesModule(),
+ new JMAPEventBusModule(),
new RabbitMQEventBusModule(),
new DistributedTaskSerializationModule());
diff --git a/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java b/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java
new file mode 100644
index 0000000..aef6f3b
--- /dev/null
+++ b/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java
@@ -0,0 +1,92 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.modules.event;
+
+import javax.inject.Named;
+
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
+import org.apache.james.backends.rabbitmq.ReceiverProvider;
+import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
+import org.apache.james.events.EventBus;
+import org.apache.james.events.EventBusId;
+import org.apache.james.events.EventDeadLetters;
+import org.apache.james.events.KeyReconnectionHandler;
+import org.apache.james.events.NamingStrategy;
+import org.apache.james.events.RabbitMQEventBus;
+import org.apache.james.events.RetryBackoffConfiguration;
+import org.apache.james.events.RoutingKeyConverter;
+import org.apache.james.jmap.InjectionKeys;
+import org.apache.james.jmap.change.Factory;
+import org.apache.james.jmap.change.JmapEventSerializer;
+import org.apache.james.metrics.api.MetricFactory;
+import org.apache.james.utils.InitializationOperation;
+import org.apache.james.utils.InitilizationOperationBuilder;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import com.google.inject.multibindings.ProvidesIntoSet;
+import com.google.inject.name.Names;
+
+import reactor.rabbitmq.Sender;
+
+public class JMAPEventBusModule extends AbstractModule {
+ public static final NamingStrategy JMAP_NAMING_STRATEGY = new NamingStrategy("jmapEvent");
+
+ @Override
+ protected void configure() {
+ bind(EventBusId.class).annotatedWith(Names.named(InjectionKeys.JMAP)).toInstance(EventBusId.random());
+ }
+
+ @ProvidesIntoSet
+ InitializationOperation workQueue(@Named(InjectionKeys.JMAP) RabbitMQEventBus instance) {
+ return InitilizationOperationBuilder
+ .forClass(RabbitMQEventBus.class)
+ .init(instance::start);
+ }
+
+ @ProvidesIntoSet
+ SimpleConnectionPool.ReconnectionHandler provideReconnectionHandler(@Named(InjectionKeys.JMAP) EventBusId eventBusId) {
+ return new KeyReconnectionHandler(JMAP_NAMING_STRATEGY, eventBusId);
+ }
+
+ @Provides
+ @Singleton
+ @Named(InjectionKeys.JMAP)
+ RabbitMQEventBus provideJmapEventBus(Sender sender, ReceiverProvider receiverProvider,
+ JmapEventSerializer eventSerializer,
+ RetryBackoffConfiguration retryBackoffConfiguration,
+ EventDeadLetters eventDeadLetters,
+ MetricFactory metricFactory, ReactorRabbitMQChannelPool channelPool,
+ @Named(InjectionKeys.JMAP) EventBusId eventBusId) {
+ return new RabbitMQEventBus(
+ JMAP_NAMING_STRATEGY,
+ sender, receiverProvider, eventSerializer, retryBackoffConfiguration, new RoutingKeyConverter(ImmutableSet.of(new Factory())),
+ eventDeadLetters, metricFactory, channelPool, eventBusId);
+ }
+
+ @Provides
+ @Singleton
+ @Named(InjectionKeys.JMAP)
+ EventBus provideJmapEventBus(@Named(InjectionKeys.JMAP) RabbitMQEventBus rabbitMQEventBus) {
+ return rabbitMQEventBus;
+ }
+}
diff --git a/server/container/guice/memory-guice/src/main/java/org/apache/james/MemoryJamesServerMain.java b/server/container/guice/memory-guice/src/main/java/org/apache/james/MemoryJamesServerMain.java
index 152693e..33d63c8 100644
--- a/server/container/guice/memory-guice/src/main/java/org/apache/james/MemoryJamesServerMain.java
+++ b/server/container/guice/memory-guice/src/main/java/org/apache/james/MemoryJamesServerMain.java
@@ -32,6 +32,7 @@ import org.apache.james.modules.eventstore.MemoryEventStoreModule;
import org.apache.james.modules.mailbox.MemoryMailboxModule;
import org.apache.james.modules.protocols.IMAPServerModule;
import org.apache.james.modules.protocols.JMAPServerModule;
+import org.apache.james.modules.protocols.JmapEventBusModule;
import org.apache.james.modules.protocols.LMTPServerModule;
import org.apache.james.modules.protocols.ManageSieveServerModule;
import org.apache.james.modules.protocols.POP3ServerModule;
@@ -99,6 +100,7 @@ public class MemoryJamesServerMain implements JamesServerMain {
new SpamAssassinListenerModule());
public static final Module JMAP = Modules.combine(
+ new JmapEventBusModule(),
new JmapTasksModule(),
new MemoryDataJmapModule(),
new JMAPServerModule());
diff --git a/server/container/guice/protocols/jmap/src/main/java/org/apache/james/modules/protocols/JmapEventBusModule.java b/server/container/guice/protocols/jmap/src/main/java/org/apache/james/modules/protocols/JmapEventBusModule.java
new file mode 100644
index 0000000..9faba9d
--- /dev/null
+++ b/server/container/guice/protocols/jmap/src/main/java/org/apache/james/modules/protocols/JmapEventBusModule.java
@@ -0,0 +1,14 @@
+package org.apache.james.modules.protocols;
+
+import org.apache.james.events.EventBus;
+import org.apache.james.jmap.InjectionKeys;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.name.Names;
+
+public class JmapEventBusModule extends AbstractModule {
+ @Override
+ protected void configure() {
+ bind(EventBus.class).annotatedWith(Names.named(InjectionKeys.JMAP)).to(EventBus.class);
+ }
+}
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/JmapEventSerializer.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/JmapEventSerializer.scala
index 2c4e28c..ea5461e 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/JmapEventSerializer.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/JmapEventSerializer.scala
@@ -22,14 +22,12 @@ package org.apache.james.jmap.change
import java.util.Optional
import com.fasterxml.jackson.annotation.JsonProperty
-import javax.inject.Inject
import org.apache.james.core.Username
import org.apache.james.events.Event.EventId
import org.apache.james.events.{Event, EventSerializer}
import org.apache.james.jmap.core.State
import org.apache.james.json.JsonGenericSerializer
-import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters._
object StateChangeEventDTO {
@@ -60,17 +58,14 @@ case class StateChangeEventDTO(@JsonProperty("type") getType: String,
emailState = getEmailState.toScala.map(State.fromStringUnchecked))
}
-case class JmapEventSerializer(dtoModules: Set[EventDTOModule[Event, EventDTO]]) extends EventSerializer {
- @Inject
- def this(javaModules: java.util.Set[EventDTOModule[Event, EventDTO]]) {
- this(javaModules.asScala.toSet)
- }
-
- private val genericSerializer: JsonGenericSerializer[Event, EventDTO] = JsonGenericSerializer
- .forModules(dtoModules.asJava)
+case class JmapEventSerializer() extends EventSerializer {
+ private val genericSerializer: JsonGenericSerializer[StateChangeEvent, StateChangeEventDTO] = JsonGenericSerializer
+ .forModules(StateChangeEventDTO.dtoModule)
.withoutNestedType()
- override def toJson(event: Event): String = genericSerializer.serialize(event)
+ override def toJson(event: Event): String = event match {
+ case stateChangeEvent: StateChangeEvent => genericSerializer.serialize(stateChangeEvent)
+ }
override def asEvent(serialized: String): Event = genericSerializer.deserialize(serialized)
}
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala
index 1f6986c..627941f 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala
@@ -21,13 +21,16 @@ package org.apache.james.jmap.change
import java.time.{Clock, ZonedDateTime}
-import javax.inject.Inject
+import javax.inject.{Inject, Named}
import org.apache.james.core.Username
+import org.apache.james.events.Event.EventId
import org.apache.james.events.EventListener.ReactiveGroupEventListener
-import org.apache.james.events.{Event, Group}
+import org.apache.james.events.{Event, EventBus, Group, RegistrationKey}
+import org.apache.james.jmap.InjectionKeys
import org.apache.james.jmap.api.change.{EmailChange, EmailChangeRepository, JmapChange, MailboxChange, MailboxChangeRepository}
import org.apache.james.jmap.api.model.AccountId
import org.apache.james.jmap.change.MailboxChangeListener.LOGGER
+import org.apache.james.jmap.core.State
import org.apache.james.mailbox.events.MailboxEvents.{Added, Expunged, FlagsUpdated, MailboxACLUpdated, MailboxAdded, MailboxDeletion, MailboxEvent, MailboxRenamed}
import org.apache.james.mailbox.exception.MailboxException
import org.apache.james.mailbox.model.{MailboxACL, MailboxId}
@@ -45,7 +48,8 @@ object MailboxChangeListener {
val LOGGER: Logger = LoggerFactory.getLogger(classOf[MailboxChangeListener])
}
-case class MailboxChangeListener @Inject() (mailboxChangeRepository: MailboxChangeRepository,
+case class MailboxChangeListener @Inject() (@Named(InjectionKeys.JMAP) eventBus: EventBus,
+ mailboxChangeRepository: MailboxChangeRepository,
mailboxChangeFactory: MailboxChange.Factory,
emailChangeRepository: EmailChangeRepository,
emailChangeFactory: EmailChange.Factory,
@@ -94,10 +98,11 @@ case class MailboxChangeListener @Inject() (mailboxChangeRepository: MailboxChan
}
private def saveChangeEvent(jmapChange: JmapChange): Publisher[Void] =
- jmapChange match {
+ SMono(jmapChange match {
case mailboxChange: MailboxChange => mailboxChangeRepository.save(mailboxChange)
case emailChange: EmailChange => emailChangeRepository.save(emailChange)
- }
+ }).`then`(SMono(eventBus.dispatch(toStateChangeEvent(jmapChange), Set[RegistrationKey]().asJava)))
+
private def getSharees(mailboxId: MailboxId, username: Username): List[AccountId] = {
val mailboxSession: MailboxSession = mailboxManager.createSystemSession(username)
@@ -116,4 +121,17 @@ case class MailboxChangeListener @Inject() (mailboxChangeRepository: MailboxChan
List.empty
}
}
+
+ private def toStateChangeEvent(jmapChange: JmapChange): StateChangeEvent = jmapChange match {
+ case emailChange: EmailChange => StateChangeEvent(
+ eventId = EventId.random(),
+ username = Username.of(emailChange.getAccountId.getIdentifier),
+ emailState = Some(State.fromJava(emailChange.getState)),
+ mailboxState = None)
+ case mailboxChange: MailboxChange => StateChangeEvent(
+ eventId = EventId.random(),
+ username = Username.of(mailboxChange.getAccountId.getIdentifier),
+ emailState = Some(State.fromJava(mailboxChange.getState)),
+ mailboxState = None)
+ }
}
diff --git a/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/MailboxChangeListenerTest.scala b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/MailboxChangeListenerTest.scala
index c93d891..17a91a5 100644
--- a/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/MailboxChangeListenerTest.scala
+++ b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/MailboxChangeListenerTest.scala
@@ -20,10 +20,11 @@
package org.apache.james.jmap.change
import java.time.{Clock, ZonedDateTime}
+import java.util
import javax.mail.Flags
import org.apache.james.events.delivery.InVmEventDelivery
-import org.apache.james.events.{InVMEventBus, MemoryEventDeadLetters, RetryBackoffConfiguration}
+import org.apache.james.events.{Event, EventBus, EventListener, Group, InVMEventBus, MemoryEventDeadLetters, Registration, RegistrationKey, RetryBackoffConfiguration}
import org.apache.james.jmap.api.change.{EmailChange, EmailChangeRepository, MailboxChange, MailboxChangeRepository, State}
import org.apache.james.jmap.api.model.AccountId
import org.apache.james.jmap.change.MailboxChangeListenerTest.ACCOUNT_ID
@@ -36,6 +37,8 @@ import org.apache.james.mailbox.{MailboxManager, MailboxSessionUtil, MessageMana
import org.apache.james.metrics.tests.RecordingMetricFactory
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.{BeforeEach, Nested, Test}
+import org.reactivestreams.Publisher
+import reactor.core.publisher.Mono
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters._
@@ -71,7 +74,16 @@ class MailboxChangeListenerTest {
mailboxChangeRepository = new MemoryMailboxChangeRepository()
emailChangeFactory = new EmailChange.Factory(stateFactory)
emailChangeRepository = new MemoryEmailChangeRepository()
- listener = MailboxChangeListener(mailboxChangeRepository, mailboxChangeFactory, emailChangeRepository, emailChangeFactory, mailboxManager, clock)
+ val eventBus = new EventBus {
+ override def register(listener: EventListener.ReactiveEventListener, key: RegistrationKey): Publisher[Registration] = Mono.empty()
+
+ override def register(listener: EventListener.ReactiveEventListener, group: Group): Registration = () => {}
+
+ override def dispatch(event: Event, key: util.Set[RegistrationKey]): Mono[Void] = Mono.empty()
+
+ override def reDeliver(group: Group, event: Event): Mono[Void] = Mono.empty()
+ }
+ listener = MailboxChangeListener(eventBus, mailboxChangeRepository, mailboxChangeFactory, emailChangeRepository, emailChangeFactory, mailboxManager, clock)
resources.getEventBus.register(listener)
}
diff --git a/server/protocols/jmap/src/main/java/org/apache/james/jmap/InjectionKeys.java b/server/protocols/jmap/src/main/java/org/apache/james/jmap/InjectionKeys.java
new file mode 100644
index 0000000..7d2c080
--- /dev/null
+++ b/server/protocols/jmap/src/main/java/org/apache/james/jmap/InjectionKeys.java
@@ -0,0 +1,24 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.jmap;
+
+public interface InjectionKeys {
+ String JMAP = "JMAP";
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org