You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/02/09 04:29:45 UTC

[james-project] 17/33: JAMES-3491 Have an EventBus for JMAP

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit b349944f46bb691208982838ec71fad79ef20a58
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Feb 3 16:06:24 2021 +0700

    JAMES-3491 Have an EventBus for JMAP
---
 .../org/apache/james/CassandraJamesServerMain.java |  2 +
 .../james/CassandraRabbitMQJamesServerMain.java    |  2 +
 .../james/modules/event/JMAPEventBusModule.java    | 92 ++++++++++++++++++++++
 .../org/apache/james/MemoryJamesServerMain.java    |  2 +
 .../modules/protocols/JmapEventBusModule.java      | 14 ++++
 .../james/jmap/change/JmapEventSerializer.scala    | 17 ++--
 .../james/jmap/change/MailboxChangeListener.scala  | 28 +++++--
 .../jmap/change/MailboxChangeListenerTest.scala    | 16 +++-
 .../java/org/apache/james/jmap/InjectionKeys.java  | 24 ++++++
 9 files changed, 179 insertions(+), 18 deletions(-)

diff --git a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/CassandraJamesServerMain.java b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/CassandraJamesServerMain.java
index 40bfa50..908f486 100644
--- a/server/container/guice/cassandra-guice/src/main/java/org/apache/james/CassandraJamesServerMain.java
+++ b/server/container/guice/cassandra-guice/src/main/java/org/apache/james/CassandraJamesServerMain.java
@@ -46,6 +46,7 @@ import org.apache.james.modules.mailrepository.CassandraMailRepositoryModule;
 import org.apache.james.modules.metrics.CassandraMetricsModule;
 import org.apache.james.modules.protocols.IMAPServerModule;
 import org.apache.james.modules.protocols.JMAPServerModule;
+import org.apache.james.modules.protocols.JmapEventBusModule;
 import org.apache.james.modules.protocols.LMTPServerModule;
 import org.apache.james.modules.protocols.ManageSieveServerModule;
 import org.apache.james.modules.protocols.POP3ServerModule;
@@ -111,6 +112,7 @@ public class CassandraJamesServerMain implements JamesServerMain {
         new ProtocolHandlerModule(),
         new SMTPServerModule(),
         new JMAPServerModule(),
+        new JmapEventBusModule(),
         WEBADMIN);
 
     public static final Module PLUGINS = Modules.combine(
diff --git a/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java b/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java
index c0c23d9..69c1ee5 100644
--- a/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java
+++ b/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/CassandraRabbitMQJamesServerMain.java
@@ -26,6 +26,7 @@ import org.apache.james.modules.DistributedTaskSerializationModule;
 import org.apache.james.modules.blobstore.BlobStoreCacheModulesChooser;
 import org.apache.james.modules.blobstore.BlobStoreConfiguration;
 import org.apache.james.modules.blobstore.BlobStoreModulesChooser;
+import org.apache.james.modules.event.JMAPEventBusModule;
 import org.apache.james.modules.event.RabbitMQEventBusModule;
 import org.apache.james.modules.queue.rabbitmq.RabbitMQModule;
 import org.apache.james.modules.server.JMXServerModule;
@@ -40,6 +41,7 @@ public class CassandraRabbitMQJamesServerMain implements JamesServerMain {
             .override(Modules.combine(REQUIRE_TASK_MANAGER_MODULE, new DistributedTaskManagerModule()))
             .with(new RabbitMQModule(),
                 new RabbitMailQueueRoutesModule(),
+                new JMAPEventBusModule(),
                 new RabbitMQEventBusModule(),
                 new DistributedTaskSerializationModule());
 
diff --git a/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java b/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java
new file mode 100644
index 0000000..aef6f3b
--- /dev/null
+++ b/server/container/guice/cassandra-rabbitmq-guice/src/main/java/org/apache/james/modules/event/JMAPEventBusModule.java
@@ -0,0 +1,92 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.modules.event;
+
+import javax.inject.Named;
+
+import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool;
+import org.apache.james.backends.rabbitmq.ReceiverProvider;
+import org.apache.james.backends.rabbitmq.SimpleConnectionPool;
+import org.apache.james.events.EventBus;
+import org.apache.james.events.EventBusId;
+import org.apache.james.events.EventDeadLetters;
+import org.apache.james.events.KeyReconnectionHandler;
+import org.apache.james.events.NamingStrategy;
+import org.apache.james.events.RabbitMQEventBus;
+import org.apache.james.events.RetryBackoffConfiguration;
+import org.apache.james.events.RoutingKeyConverter;
+import org.apache.james.jmap.InjectionKeys;
+import org.apache.james.jmap.change.Factory;
+import org.apache.james.jmap.change.JmapEventSerializer;
+import org.apache.james.metrics.api.MetricFactory;
+import org.apache.james.utils.InitializationOperation;
+import org.apache.james.utils.InitilizationOperationBuilder;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.AbstractModule;
+import com.google.inject.Provides;
+import com.google.inject.Singleton;
+import com.google.inject.multibindings.ProvidesIntoSet;
+import com.google.inject.name.Names;
+
+import reactor.rabbitmq.Sender;
+
+public class JMAPEventBusModule extends AbstractModule {
+    public static final NamingStrategy JMAP_NAMING_STRATEGY = new NamingStrategy("jmapEvent");
+
+    @Override
+    protected void configure() {
+        bind(EventBusId.class).annotatedWith(Names.named(InjectionKeys.JMAP)).toInstance(EventBusId.random());
+    }
+
+    @ProvidesIntoSet
+    InitializationOperation workQueue(@Named(InjectionKeys.JMAP) RabbitMQEventBus instance) {
+        return InitilizationOperationBuilder
+            .forClass(RabbitMQEventBus.class)
+            .init(instance::start);
+    }
+
+    @ProvidesIntoSet
+    SimpleConnectionPool.ReconnectionHandler provideReconnectionHandler(@Named(InjectionKeys.JMAP) EventBusId eventBusId) {
+        return new KeyReconnectionHandler(JMAP_NAMING_STRATEGY, eventBusId);
+    }
+
+    @Provides
+    @Singleton
+    @Named(InjectionKeys.JMAP)
+    RabbitMQEventBus provideJmapEventBus(Sender sender, ReceiverProvider receiverProvider,
+                                 JmapEventSerializer eventSerializer,
+                                 RetryBackoffConfiguration retryBackoffConfiguration,
+                                 EventDeadLetters eventDeadLetters,
+                                 MetricFactory metricFactory, ReactorRabbitMQChannelPool channelPool,
+                                 @Named(InjectionKeys.JMAP) EventBusId eventBusId) {
+        return new RabbitMQEventBus(
+            JMAP_NAMING_STRATEGY,
+            sender, receiverProvider, eventSerializer, retryBackoffConfiguration, new RoutingKeyConverter(ImmutableSet.of(new Factory())),
+            eventDeadLetters, metricFactory, channelPool, eventBusId);
+    }
+
+    @Provides
+    @Singleton
+    @Named(InjectionKeys.JMAP)
+    EventBus provideJmapEventBus(@Named(InjectionKeys.JMAP) RabbitMQEventBus rabbitMQEventBus) {
+        return rabbitMQEventBus;
+    }
+}
diff --git a/server/container/guice/memory-guice/src/main/java/org/apache/james/MemoryJamesServerMain.java b/server/container/guice/memory-guice/src/main/java/org/apache/james/MemoryJamesServerMain.java
index 152693e..33d63c8 100644
--- a/server/container/guice/memory-guice/src/main/java/org/apache/james/MemoryJamesServerMain.java
+++ b/server/container/guice/memory-guice/src/main/java/org/apache/james/MemoryJamesServerMain.java
@@ -32,6 +32,7 @@ import org.apache.james.modules.eventstore.MemoryEventStoreModule;
 import org.apache.james.modules.mailbox.MemoryMailboxModule;
 import org.apache.james.modules.protocols.IMAPServerModule;
 import org.apache.james.modules.protocols.JMAPServerModule;
+import org.apache.james.modules.protocols.JmapEventBusModule;
 import org.apache.james.modules.protocols.LMTPServerModule;
 import org.apache.james.modules.protocols.ManageSieveServerModule;
 import org.apache.james.modules.protocols.POP3ServerModule;
@@ -99,6 +100,7 @@ public class MemoryJamesServerMain implements JamesServerMain {
         new SpamAssassinListenerModule());
 
     public static final Module JMAP = Modules.combine(
+        new JmapEventBusModule(),
         new JmapTasksModule(),
         new MemoryDataJmapModule(),
         new JMAPServerModule());
diff --git a/server/container/guice/protocols/jmap/src/main/java/org/apache/james/modules/protocols/JmapEventBusModule.java b/server/container/guice/protocols/jmap/src/main/java/org/apache/james/modules/protocols/JmapEventBusModule.java
new file mode 100644
index 0000000..9faba9d
--- /dev/null
+++ b/server/container/guice/protocols/jmap/src/main/java/org/apache/james/modules/protocols/JmapEventBusModule.java
@@ -0,0 +1,14 @@
+package org.apache.james.modules.protocols;
+
+import org.apache.james.events.EventBus;
+import org.apache.james.jmap.InjectionKeys;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.name.Names;
+
+public class JmapEventBusModule extends AbstractModule {
+    @Override
+    protected void configure() {
+        bind(EventBus.class).annotatedWith(Names.named(InjectionKeys.JMAP)).to(EventBus.class);
+    }
+}
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/JmapEventSerializer.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/JmapEventSerializer.scala
index 2c4e28c..ea5461e 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/JmapEventSerializer.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/JmapEventSerializer.scala
@@ -22,14 +22,12 @@ package org.apache.james.jmap.change
 import java.util.Optional
 
 import com.fasterxml.jackson.annotation.JsonProperty
-import javax.inject.Inject
 import org.apache.james.core.Username
 import org.apache.james.events.Event.EventId
 import org.apache.james.events.{Event, EventSerializer}
 import org.apache.james.jmap.core.State
 import org.apache.james.json.JsonGenericSerializer
 
-import scala.jdk.CollectionConverters._
 import scala.jdk.OptionConverters._
 
 object StateChangeEventDTO {
@@ -60,17 +58,14 @@ case class StateChangeEventDTO(@JsonProperty("type") getType: String,
     emailState = getEmailState.toScala.map(State.fromStringUnchecked))
 }
 
-case class JmapEventSerializer(dtoModules: Set[EventDTOModule[Event, EventDTO]]) extends EventSerializer {
-  @Inject
-  def this(javaModules: java.util.Set[EventDTOModule[Event, EventDTO]]) {
-    this(javaModules.asScala.toSet)
-  }
-
-  private val genericSerializer: JsonGenericSerializer[Event, EventDTO] = JsonGenericSerializer
-    .forModules(dtoModules.asJava)
+case class JmapEventSerializer() extends EventSerializer {
+  private val genericSerializer: JsonGenericSerializer[StateChangeEvent, StateChangeEventDTO] = JsonGenericSerializer
+    .forModules(StateChangeEventDTO.dtoModule)
     .withoutNestedType()
 
-  override def toJson(event: Event): String = genericSerializer.serialize(event)
+  override def toJson(event: Event): String = event match {
+    case stateChangeEvent: StateChangeEvent => genericSerializer.serialize(stateChangeEvent)
+  }
 
   override def asEvent(serialized: String): Event = genericSerializer.deserialize(serialized)
 }
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala
index 1f6986c..627941f 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala
@@ -21,13 +21,16 @@ package org.apache.james.jmap.change
 
 import java.time.{Clock, ZonedDateTime}
 
-import javax.inject.Inject
+import javax.inject.{Inject, Named}
 import org.apache.james.core.Username
+import org.apache.james.events.Event.EventId
 import org.apache.james.events.EventListener.ReactiveGroupEventListener
-import org.apache.james.events.{Event, Group}
+import org.apache.james.events.{Event, EventBus, Group, RegistrationKey}
+import org.apache.james.jmap.InjectionKeys
 import org.apache.james.jmap.api.change.{EmailChange, EmailChangeRepository, JmapChange, MailboxChange, MailboxChangeRepository}
 import org.apache.james.jmap.api.model.AccountId
 import org.apache.james.jmap.change.MailboxChangeListener.LOGGER
+import org.apache.james.jmap.core.State
 import org.apache.james.mailbox.events.MailboxEvents.{Added, Expunged, FlagsUpdated, MailboxACLUpdated, MailboxAdded, MailboxDeletion, MailboxEvent, MailboxRenamed}
 import org.apache.james.mailbox.exception.MailboxException
 import org.apache.james.mailbox.model.{MailboxACL, MailboxId}
@@ -45,7 +48,8 @@ object MailboxChangeListener {
   val LOGGER: Logger = LoggerFactory.getLogger(classOf[MailboxChangeListener])
 }
 
-case class MailboxChangeListener @Inject() (mailboxChangeRepository: MailboxChangeRepository,
+case class MailboxChangeListener @Inject() (@Named(InjectionKeys.JMAP) eventBus: EventBus,
+                                            mailboxChangeRepository: MailboxChangeRepository,
                                             mailboxChangeFactory: MailboxChange.Factory,
                                             emailChangeRepository: EmailChangeRepository,
                                             emailChangeFactory: EmailChange.Factory,
@@ -94,10 +98,11 @@ case class MailboxChangeListener @Inject() (mailboxChangeRepository: MailboxChan
   }
 
   private def saveChangeEvent(jmapChange: JmapChange): Publisher[Void] =
-    jmapChange match {
+    SMono(jmapChange match {
       case mailboxChange: MailboxChange => mailboxChangeRepository.save(mailboxChange)
       case emailChange: EmailChange => emailChangeRepository.save(emailChange)
-    }
+    }).`then`(SMono(eventBus.dispatch(toStateChangeEvent(jmapChange), Set[RegistrationKey]().asJava)))
+
 
   private def getSharees(mailboxId: MailboxId, username: Username): List[AccountId] = {
     val mailboxSession: MailboxSession = mailboxManager.createSystemSession(username)
@@ -116,4 +121,17 @@ case class MailboxChangeListener @Inject() (mailboxChangeRepository: MailboxChan
         List.empty
     }
   }
+
+  private def toStateChangeEvent(jmapChange: JmapChange): StateChangeEvent = jmapChange match {
+    case emailChange: EmailChange => StateChangeEvent(
+      eventId = EventId.random(),
+      username = Username.of(emailChange.getAccountId.getIdentifier),
+      emailState = Some(State.fromJava(emailChange.getState)),
+      mailboxState = None)
+    case mailboxChange: MailboxChange => StateChangeEvent(
+      eventId = EventId.random(),
+      username = Username.of(mailboxChange.getAccountId.getIdentifier),
+      emailState = Some(State.fromJava(mailboxChange.getState)),
+      mailboxState = None)
+  }
 }
diff --git a/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/MailboxChangeListenerTest.scala b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/MailboxChangeListenerTest.scala
index c93d891..17a91a5 100644
--- a/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/MailboxChangeListenerTest.scala
+++ b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/MailboxChangeListenerTest.scala
@@ -20,10 +20,11 @@
 package org.apache.james.jmap.change
 
 import java.time.{Clock, ZonedDateTime}
+import java.util
 
 import javax.mail.Flags
 import org.apache.james.events.delivery.InVmEventDelivery
-import org.apache.james.events.{InVMEventBus, MemoryEventDeadLetters, RetryBackoffConfiguration}
+import org.apache.james.events.{Event, EventBus, EventListener, Group, InVMEventBus, MemoryEventDeadLetters, Registration, RegistrationKey, RetryBackoffConfiguration}
 import org.apache.james.jmap.api.change.{EmailChange, EmailChangeRepository, MailboxChange, MailboxChangeRepository, State}
 import org.apache.james.jmap.api.model.AccountId
 import org.apache.james.jmap.change.MailboxChangeListenerTest.ACCOUNT_ID
@@ -36,6 +37,8 @@ import org.apache.james.mailbox.{MailboxManager, MailboxSessionUtil, MessageMana
 import org.apache.james.metrics.tests.RecordingMetricFactory
 import org.assertj.core.api.Assertions.assertThat
 import org.junit.jupiter.api.{BeforeEach, Nested, Test}
+import org.reactivestreams.Publisher
+import reactor.core.publisher.Mono
 
 import scala.jdk.CollectionConverters._
 import scala.jdk.OptionConverters._
@@ -71,7 +74,16 @@ class MailboxChangeListenerTest {
     mailboxChangeRepository = new MemoryMailboxChangeRepository()
     emailChangeFactory = new EmailChange.Factory(stateFactory)
     emailChangeRepository = new MemoryEmailChangeRepository()
-    listener = MailboxChangeListener(mailboxChangeRepository, mailboxChangeFactory, emailChangeRepository, emailChangeFactory, mailboxManager, clock)
+    val eventBus = new EventBus {
+      override def register(listener: EventListener.ReactiveEventListener, key: RegistrationKey): Publisher[Registration] = Mono.empty()
+
+      override def register(listener: EventListener.ReactiveEventListener, group: Group): Registration = () => {}
+
+      override def dispatch(event: Event, key: util.Set[RegistrationKey]): Mono[Void] = Mono.empty()
+
+      override def reDeliver(group: Group, event: Event): Mono[Void] = Mono.empty()
+    }
+    listener = MailboxChangeListener(eventBus, mailboxChangeRepository, mailboxChangeFactory, emailChangeRepository, emailChangeFactory, mailboxManager, clock)
     resources.getEventBus.register(listener)
   }
 
diff --git a/server/protocols/jmap/src/main/java/org/apache/james/jmap/InjectionKeys.java b/server/protocols/jmap/src/main/java/org/apache/james/jmap/InjectionKeys.java
new file mode 100644
index 0000000..7d2c080
--- /dev/null
+++ b/server/protocols/jmap/src/main/java/org/apache/james/jmap/InjectionKeys.java
@@ -0,0 +1,24 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.jmap;
+
+public interface InjectionKeys {
+    String JMAP = "JMAP";
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org