You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/02/09 04:29:46 UTC

[james-project] 18/33: JAMES-3491 Register state changes upon WebSocketPushEnable reception

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 737e2c04c28771b10c5094d8c6b15adf1d884aa3
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Feb 5 10:10:56 2021 +0700

    JAMES-3491 Register state changes upon WebSocketPushEnable reception
---
 .../modules/protocols/JmapEventBusModule.java      | 19 ++++++++
 .../apache/james/jmap/api/change/JmapChange.java   |  4 +-
 .../jmap/change/AccountIdRegistrationKey.scala     |  5 ++
 .../james/jmap/change/MailboxChangeListener.scala  |  8 ++--
 .../org/apache/james/jmap/change/StateChange.scala |  4 +-
 .../james/jmap/core/WebSocketTransport.scala       |  4 +-
 .../james/jmap/json/ResponseSerializer.scala       |  6 +--
 .../apache/james/jmap/routes/WebSocketRoutes.scala | 53 ++++++++++++++++------
 8 files changed, 79 insertions(+), 24 deletions(-)

diff --git a/server/container/guice/protocols/jmap/src/main/java/org/apache/james/modules/protocols/JmapEventBusModule.java b/server/container/guice/protocols/jmap/src/main/java/org/apache/james/modules/protocols/JmapEventBusModule.java
index 9faba9d..0fafb26 100644
--- a/server/container/guice/protocols/jmap/src/main/java/org/apache/james/modules/protocols/JmapEventBusModule.java
+++ b/server/container/guice/protocols/jmap/src/main/java/org/apache/james/modules/protocols/JmapEventBusModule.java
@@ -1,3 +1,22 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
 package org.apache.james.modules.protocols;
 
 import org.apache.james.events.EventBus;
diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/JmapChange.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/JmapChange.java
index b7a1a2e..3767de1 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/JmapChange.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/JmapChange.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.jmap.api.change;
 
-public interface JmapChange {
+import org.apache.james.jmap.api.model.AccountId;
 
+public interface JmapChange {
+    AccountId getAccountId();
 }
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/AccountIdRegistrationKey.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/AccountIdRegistrationKey.scala
index b2b969b..5bf3eba 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/AccountIdRegistrationKey.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/AccountIdRegistrationKey.scala
@@ -19,6 +19,7 @@
 
 package org.apache.james.jmap.change
 
+import org.apache.james.core.Username
 import org.apache.james.events.RegistrationKey
 import org.apache.james.jmap.api.model.AccountId
 
@@ -28,6 +29,10 @@ case class Factory() extends RegistrationKey.Factory {
   override def fromString(asString: String): RegistrationKey = AccountIdRegistrationKey(AccountId.fromString(asString))
 }
 
+object AccountIdRegistrationKey {
+  def of(username: Username): AccountIdRegistrationKey = AccountIdRegistrationKey(AccountId.fromUsername(username))
+}
+
 case class AccountIdRegistrationKey(accountId: AccountId) extends RegistrationKey {
   override def asString(): String = accountId.getIdentifier
 }
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala
index 627941f..6e57c4f 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala
@@ -25,7 +25,7 @@ import javax.inject.{Inject, Named}
 import org.apache.james.core.Username
 import org.apache.james.events.Event.EventId
 import org.apache.james.events.EventListener.ReactiveGroupEventListener
-import org.apache.james.events.{Event, EventBus, Group, RegistrationKey}
+import org.apache.james.events.{Event, EventBus, Group}
 import org.apache.james.jmap.InjectionKeys
 import org.apache.james.jmap.api.change.{EmailChange, EmailChangeRepository, JmapChange, MailboxChange, MailboxChangeRepository}
 import org.apache.james.jmap.api.model.AccountId
@@ -101,7 +101,7 @@ case class MailboxChangeListener @Inject() (@Named(InjectionKeys.JMAP) eventBus:
     SMono(jmapChange match {
       case mailboxChange: MailboxChange => mailboxChangeRepository.save(mailboxChange)
       case emailChange: EmailChange => emailChangeRepository.save(emailChange)
-    }).`then`(SMono(eventBus.dispatch(toStateChangeEvent(jmapChange), Set[RegistrationKey]().asJava)))
+    }).`then`(SMono(eventBus.dispatch(toStateChangeEvent(jmapChange), AccountIdRegistrationKey(jmapChange.getAccountId))))
 
 
   private def getSharees(mailboxId: MailboxId, username: Username): List[AccountId] = {
@@ -131,7 +131,7 @@ case class MailboxChangeListener @Inject() (@Named(InjectionKeys.JMAP) eventBus:
     case mailboxChange: MailboxChange => StateChangeEvent(
       eventId = EventId.random(),
       username = Username.of(mailboxChange.getAccountId.getIdentifier),
-      emailState = Some(State.fromJava(mailboxChange.getState)),
-      mailboxState = None)
+      emailState = None,
+      mailboxState = Some(State.fromJava(mailboxChange.getState)))
   }
 }
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChange.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChange.scala
index 1b400be..312aa72 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChange.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChange.scala
@@ -32,10 +32,10 @@ sealed trait TypeName {
   def asString(): String
 }
 case object MailboxTypeName extends TypeName {
-  override def asString(): String = "Mailbox"
+  override val asString: String = "Mailbox"
 }
 case object EmailTypeName extends TypeName {
-  override def asString(): String = "Email"
+  override val asString: String = "Email"
 }
 
 case class TypeState(changes: Map[TypeName, State]) {
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/WebSocketTransport.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/WebSocketTransport.scala
index 3d1fd01..dd40f12 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/WebSocketTransport.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/WebSocketTransport.scala
@@ -41,4 +41,6 @@ case class StateChange(changes: Map[AccountId, TypeState]) extends WebSocketOutb
     })
     .filter(_.nonEmpty)
     .map(StateChange)
-}
\ No newline at end of file
+}
+
+case class WebSocketPushEnable(dataTypes: Set[TypeName]) extends WebSocketInboundMessage
\ No newline at end of file
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/ResponseSerializer.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/ResponseSerializer.scala
index fff3c11..b1454c6 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/ResponseSerializer.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/ResponseSerializer.scala
@@ -25,7 +25,7 @@ import java.net.URL
 import eu.timepit.refined.refineV
 import io.netty.handler.codec.http.HttpResponseStatus
 import org.apache.james.core.Username
-import org.apache.james.jmap.change.{TypeName, TypeState}
+import org.apache.james.jmap.change.{EmailTypeName, MailboxTypeName, TypeName, TypeState}
 import org.apache.james.jmap.core
 import org.apache.james.jmap.core.CapabilityIdentifier.CapabilityIdentifier
 import org.apache.james.jmap.core.Id.IdConstraint
@@ -187,8 +187,8 @@ object ResponseSerializer {
     case _ => JsError("Expecting a JsObject to represent a webSocket inbound request")
   }
   private implicit val typeNameReads: Reads[TypeName] = {
-    case JsString(value) if value.equals(MailboxTypeName.asString()) => JsSuccess(MailboxTypeName)
-    case JsString(value) if value.equals(EmailTypeName.asString()) => JsSuccess(EmailTypeName)
+    case JsString(MailboxTypeName.asString) => JsSuccess(MailboxTypeName)
+    case JsString(EmailTypeName.asString) => JsSuccess(EmailTypeName)
     case _ => JsError("Expecting a JsString as typeName")
   }
   private implicit val webSocketPushEnableReads: Reads[WebSocketPushEnable] = Json.reads[WebSocketPushEnable]
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
index 4664eba..c97f4bf 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
@@ -26,13 +26,15 @@ import io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE
 import io.netty.handler.codec.http.HttpMethod
 import io.netty.handler.codec.http.websocketx.WebSocketFrame
 import javax.inject.{Inject, Named}
+import org.apache.james.events.{EventBus, Registration}
 import org.apache.james.jmap.HttpConstants.JSON_CONTENT_TYPE
 import org.apache.james.jmap.JMAPUrls.JMAP_WS
-import org.apache.james.jmap.core.{ProblemDetails, RequestId, WebSocketError, WebSocketOutboundMessage, WebSocketRequest, WebSocketResponse}
+import org.apache.james.jmap.change.{AccountIdRegistrationKey, StateChangeListener}
+import org.apache.james.jmap.core.{ProblemDetails, RequestId, WebSocketError, WebSocketOutboundMessage, WebSocketPushEnable, WebSocketRequest, WebSocketResponse}
 import org.apache.james.jmap.http.rfc8621.InjectionKeys
 import org.apache.james.jmap.http.{Authenticator, UserProvisioning}
 import org.apache.james.jmap.json.ResponseSerializer
-import org.apache.james.jmap.{Endpoint, JMAPRoute, JMAPRoutes}
+import org.apache.james.jmap.{Endpoint, JMAPRoute, JMAPRoutes, InjectionKeys => JMAPInjectionKeys}
 import org.apache.james.mailbox.MailboxSession
 import org.slf4j.{Logger, LoggerFactory}
 import reactor.core.publisher.Mono
@@ -45,8 +47,21 @@ object WebSocketRoutes {
   val LOGGER: Logger = LoggerFactory.getLogger(classOf[WebSocketRoutes])
 }
 
+case class ClientContext(outbound: WebsocketOutbound, pushRegistration: Option[Registration], session: MailboxSession) {
+  def latest(clientContext: ClientContext): ClientContext = {
+    clean
+    clientContext
+  }
+
+  def clean: ClientContext = {
+    pushRegistration.foreach(_.unregister())
+    ClientContext(outbound, None, session)
+  }
+}
+
 class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticator: Authenticator,
                                  userProvisioner: UserProvisioning,
+                                 @Named(JMAPInjectionKeys.JMAP) eventBus: EventBus,
                                  jmapApi: JMAPApi) extends JMAPRoutes {
 
   override def routes(): stream.Stream[JMAPRoute] = stream.Stream.of(
@@ -70,7 +85,8 @@ class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticato
       .`then`()
   }
 
-  private def handleWebSocketConnection(session: MailboxSession)(in: WebsocketInbound, out: WebsocketOutbound): Mono[Void] =
+  private def handleWebSocketConnection(session: MailboxSession)(in: WebsocketInbound, out: WebsocketOutbound): Mono[Void] = {
+    val context = ClientContext(out, None, session)
     SFlux[WebSocketFrame](in.aggregateFrames()
       .receiveFrames())
       .map(frame => {
@@ -78,27 +94,38 @@ class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticato
         frame.content().readBytes(bytes)
         new String(bytes, StandardCharsets.UTF_8)
       })
-      .flatMap(handleClientMessages(session))
-      .onErrorResume(e => SMono.just[WebSocketOutboundMessage](asError(None)(e)))
-      .map(ResponseSerializer.serialize)
-      .map(_.toString)
-      .flatMap(response => out.sendString(SMono.just(response), StandardCharsets.UTF_8))
+      .flatMap(message => handleClientMessages(context)(message))
+      .reduce((c1: ClientContext, c2: ClientContext) => c1.latest(c2))
+      .map[ClientContext](context => context.clean)
       .`then`()
       .asJava()
       .`then`()
+  }
 
-  private def handleClientMessages(session: MailboxSession)(message: String): SMono[WebSocketOutboundMessage] =
+  private def handleClientMessages(clientContext: ClientContext)(message: String): SMono[ClientContext] =
     ResponseSerializer.deserializeWebSocketInboundMessage(message)
       .fold(invalid => {
-        val error = asError(None)(new IllegalArgumentException(invalid.toString()))
-        SMono.just[WebSocketOutboundMessage](error)
+        val error = ResponseSerializer.serialize(asError(None)(new IllegalArgumentException(invalid.toString())))
+        SMono(clientContext.outbound.sendString(SMono.just(error.toString()), StandardCharsets.UTF_8)
+          .`then`())
+          .`then`(SMono.just(clientContext))
       }, {
           case request: WebSocketRequest =>
-            jmapApi.process(request.requestObject, session)
+            jmapApi.process(request.requestObject, clientContext.session)
               .map[WebSocketOutboundMessage](WebSocketResponse(request.requestId, _))
               .onErrorResume(e => SMono.just(asError(request.requestId)(e)))
               .subscribeOn(Schedulers.elastic)
-        })
+              .onErrorResume(e => SMono.just[WebSocketOutboundMessage](asError(None)(e)))
+              .map(ResponseSerializer.serialize)
+              .map(_.toString)
+              .flatMap(response => SMono(clientContext.outbound.sendString(SMono.just(response), StandardCharsets.UTF_8).`then`()))
+              .`then`(SMono.just(clientContext))
+          case pushEnable: WebSocketPushEnable =>
+            SMono(eventBus.register(
+                StateChangeListener(pushEnable.dataTypes, clientContext.outbound),
+                AccountIdRegistrationKey.of(clientContext.session.getUser)))
+              .map((registration: Registration) => ClientContext(clientContext.outbound, Some(registration), clientContext.session))
+      })
 
   private def handleHttpHandshakeError(throwable: Throwable, response: HttpServerResponse): SMono[Void] =
     respondDetails(response, ProblemDetails.forThrowable(throwable))


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org