You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/02/09 04:29:49 UTC

[james-project] 21/33: JAMES-3491 Rework WebSocketRoutes

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit b6c02de8125acede33713d4178c16aceb66f5c22
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu Feb 4 23:46:29 2021 +0700

    JAMES-3491 Rework WebSocketRoutes
    
     - Use a Sink to gather all messages sent to the client
    
    This enables a centralized handling, for errors, API messages, and push notification
    
     - Revert buggy "functional friendly" un-registration
    
     - Use Json.stringify
    
     - Improve unregistration - get ride of closed registrations
---
 .../james/jmap/change/StateChangeListener.scala    | 21 ++++----
 .../apache/james/jmap/routes/WebSocketRoutes.scala | 63 +++++++++++-----------
 .../jmap/change/StateChangeListenerTest.scala      |  8 +--
 3 files changed, 47 insertions(+), 45 deletions(-)

diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChangeListener.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChangeListener.scala
index a085a15..c83f642 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChangeListener.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChangeListener.scala
@@ -21,21 +21,22 @@ package org.apache.james.jmap.change
 
 import org.apache.james.events.Event
 import org.apache.james.events.EventListener.ReactiveEventListener
-import org.apache.james.jmap.core.StateChange
+import org.apache.james.jmap.core.WebSocketOutboundMessage
 import org.reactivestreams.Publisher
 import reactor.core.publisher.Sinks
 import reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST
 import reactor.core.scala.publisher.SMono
 
-case class StateChangeListener(types: Set[TypeName], sink: Sinks.Many[StateChange]) extends ReactiveEventListener {
-  override def reactiveEvent(event: Event): Publisher[Void] = event match {
-    case stateChangeEvent: StateChangeEvent =>
-      SMono.fromCallable(() => {
-        val stateChange = stateChangeEvent.asStateChange.filter(types)
-        stateChange.foreach(next => sink.emitNext(next, FAIL_FAST))
-      }).asJava().`then`()
-    case _ => SMono.empty
-  }
+case class StateChangeListener(types: Set[TypeName], sink: Sinks.Many[WebSocketOutboundMessage]) extends ReactiveEventListener {
+  override def reactiveEvent(event: Event): Publisher[Void] =
+    event match {
+      case stateChangeEvent: StateChangeEvent =>
+        SMono.fromCallable(() =>
+          stateChangeEvent.asStateChange.filter(types)
+            .foreach(next => sink.emitNext(next, FAIL_FAST)))
+          .asJava().`then`()
+      case _ => SMono.empty
+    }
 
   override def isHandling(event: Event): Boolean = event match {
     case _: StateChangeEvent => true
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
index f2b3ba8..bd038a5 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
@@ -20,6 +20,7 @@
 package org.apache.james.jmap.routes
 
 import java.nio.charset.StandardCharsets
+import java.util.concurrent.atomic.AtomicReference
 import java.util.stream
 
 import io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE
@@ -30,13 +31,15 @@ import org.apache.james.events.{EventBus, Registration}
 import org.apache.james.jmap.HttpConstants.JSON_CONTENT_TYPE
 import org.apache.james.jmap.JMAPUrls.JMAP_WS
 import org.apache.james.jmap.change.{AccountIdRegistrationKey, StateChangeListener}
-import org.apache.james.jmap.core.{ProblemDetails, RequestId, StateChange, WebSocketError, WebSocketOutboundMessage, WebSocketPushEnable, WebSocketRequest, WebSocketResponse}
+import org.apache.james.jmap.core.{ProblemDetails, RequestId, WebSocketError, WebSocketOutboundMessage, WebSocketPushEnable, WebSocketRequest, WebSocketResponse}
 import org.apache.james.jmap.http.rfc8621.InjectionKeys
 import org.apache.james.jmap.http.{Authenticator, UserProvisioning}
 import org.apache.james.jmap.json.ResponseSerializer
 import org.apache.james.jmap.{Endpoint, JMAPRoute, JMAPRoutes, InjectionKeys => JMAPInjectionKeys}
 import org.apache.james.mailbox.MailboxSession
 import org.slf4j.{Logger, LoggerFactory}
+import play.api.libs.json.Json
+import reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST
 import reactor.core.publisher.{Mono, Sinks}
 import reactor.core.scala.publisher.{SFlux, SMono}
 import reactor.core.scheduler.Schedulers
@@ -47,16 +50,15 @@ object WebSocketRoutes {
   val LOGGER: Logger = LoggerFactory.getLogger(classOf[WebSocketRoutes])
 }
 
-case class ClientContext(outbound: WebsocketOutbound, pushRegistration: Option[Registration], session: MailboxSession) {
-  def latest(clientContext: ClientContext): ClientContext = {
-    clean
-    clientContext
-  }
+case class ClientContext(outbound: Sinks.Many[WebSocketOutboundMessage], pushRegistration: AtomicReference[Registration], session: MailboxSession) {
+  def withRegistration(registration: Registration): Unit = withRegistration(Some(registration))
 
-  def clean: ClientContext = {
-    pushRegistration.foreach(_.unregister())
-    ClientContext(outbound, None, session)
-  }
+  def clean(): Unit = withRegistration(None)
+
+  def withRegistration(registration: Option[Registration]): Unit = Option(pushRegistration.getAndSet(registration.orNull))
+    .foreach(oldRegistration => SMono.fromCallable(() => oldRegistration.unregister())
+      .subscribeOn(Schedulers.elastic())
+      .subscribe())
 }
 
 class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticator: Authenticator,
@@ -86,7 +88,17 @@ class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticato
   }
 
   private def handleWebSocketConnection(session: MailboxSession)(in: WebsocketInbound, out: WebsocketOutbound): Mono[Void] = {
-    val context = ClientContext(out, None, session)
+    val sink: Sinks.Many[WebSocketOutboundMessage] = Sinks.many().unicast().onBackpressureBuffer()
+
+    out.sendString(
+      sink.asFlux()
+        .map(ResponseSerializer.serialize)
+        .map(Json.stringify),
+      StandardCharsets.UTF_8).`then`
+      .subscribeOn(Schedulers.elastic())
+      .subscribe()
+
+    val context = ClientContext(sink, new AtomicReference[Registration](), session)
     SFlux[WebSocketFrame](in.aggregateFrames()
       .receiveFrames())
       .map(frame => {
@@ -95,20 +107,17 @@ class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticato
         new String(bytes, StandardCharsets.UTF_8)
       })
       .flatMap(message => handleClientMessages(context)(message))
-      .reduce((c1: ClientContext, c2: ClientContext) => c1.latest(c2))
-      .map[ClientContext](context => context.clean)
+      .doOnTerminate(context.clean)
       .`then`()
       .asJava()
       .`then`()
   }
 
-  private def handleClientMessages(clientContext: ClientContext)(message: String): SMono[ClientContext] =
+  private def handleClientMessages(clientContext: ClientContext)(message: String): SMono[Unit] =
     ResponseSerializer.deserializeWebSocketInboundMessage(message)
       .fold(invalid => {
-        val error = ResponseSerializer.serialize(asError(None)(new IllegalArgumentException(invalid.toString())))
-        SMono(clientContext.outbound.sendString(SMono.just(error.toString()), StandardCharsets.UTF_8)
-          .`then`())
-          .`then`(SMono.just(clientContext))
+        val error = asError(None)(new IllegalArgumentException(invalid.toString()))
+        SMono.fromCallable(() => clientContext.outbound.emitNext(error, FAIL_FAST))
       }, {
           case request: WebSocketRequest =>
             jmapApi.process(request.requestObject, clientContext.session)
@@ -116,22 +125,14 @@ class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticato
               .onErrorResume(e => SMono.just(asError(request.requestId)(e)))
               .subscribeOn(Schedulers.elastic)
               .onErrorResume(e => SMono.just[WebSocketOutboundMessage](asError(None)(e)))
-              .map(ResponseSerializer.serialize)
-              .map(_.toString)
-              .flatMap(response => SMono(clientContext.outbound.sendString(SMono.just(response), StandardCharsets.UTF_8).`then`()))
-              .`then`(SMono.just(clientContext))
+              .doOnNext(next => clientContext.outbound.emitNext(next, FAIL_FAST))
+              .`then`()
           case pushEnable: WebSocketPushEnable =>
-            val sink: Sinks.Many[StateChange] = Sinks.many().unicast().onBackpressureError()
             SMono(eventBus.register(
-                StateChangeListener(pushEnable.dataTypes, sink),
+                StateChangeListener(pushEnable.dataTypes, clientContext.outbound),
                 AccountIdRegistrationKey.of(clientContext.session.getUser)))
-              .map((registration: Registration) => ClientContext(clientContext.outbound, Some(registration), clientContext.session))
-              .doOnNext(context => sink.asFlux()
-                .map(ResponseSerializer.serialize)
-                .map(_.toString)
-                .flatMap(response => SMono(context.outbound.sendString(SMono.just(response), StandardCharsets.UTF_8).`then`()))
-                .subscribeOn(Schedulers.elastic())
-                .subscribe())
+              .doOnNext(newRegistration => clientContext.withRegistration(newRegistration))
+              .`then`()
       })
 
   private def handleHttpHandshakeError(throwable: Throwable, response: HttpServerResponse): SMono[Void] =
diff --git a/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/StateChangeListenerTest.scala b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/StateChangeListenerTest.scala
index 0addd3e..efa5775 100644
--- a/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/StateChangeListenerTest.scala
+++ b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/StateChangeListenerTest.scala
@@ -21,7 +21,7 @@ package org.apache.james.jmap.change
 
 import org.apache.james.core.Username
 import org.apache.james.events.Event.EventId
-import org.apache.james.jmap.core.{AccountId, State, StateChange}
+import org.apache.james.jmap.core.{AccountId, State, StateChange, WebSocketOutboundMessage}
 import org.assertj.core.api.Assertions.assertThat
 import org.junit.jupiter.api.Test
 import reactor.core.publisher.Sinks
@@ -36,7 +36,7 @@ class StateChangeListenerTest {
 
   @Test
   def reactiveEventShouldSendAnOutboundMessage(): Unit = {
-    val sink: Sinks.Many[StateChange] = Sinks.many().unicast().onBackpressureBuffer()
+    val sink: Sinks.Many[WebSocketOutboundMessage] = Sinks.many().unicast().onBackpressureBuffer()
     val event = StateChangeEvent(eventId = eventId,
       username = Username.of("bob"),
       mailboxState = Some(mailboxState),
@@ -54,7 +54,7 @@ class StateChangeListenerTest {
 
   @Test
   def reactiveEventShouldOmitUnwantedTypes(): Unit = {
-    val sink: Sinks.Many[StateChange] = Sinks.many().unicast().onBackpressureBuffer()
+    val sink: Sinks.Many[WebSocketOutboundMessage] = Sinks.many().unicast().onBackpressureBuffer()
     val event = StateChangeEvent(eventId = eventId,
       username = Username.of("bob"),
       mailboxState = Some(mailboxState),
@@ -71,7 +71,7 @@ class StateChangeListenerTest {
 
   @Test
   def reactiveEventShouldFilterOutUnwantedEvents(): Unit = {
-    val sink: Sinks.Many[StateChange] = Sinks.many().unicast().onBackpressureBuffer()
+    val sink: Sinks.Many[WebSocketOutboundMessage] = Sinks.many().unicast().onBackpressureBuffer()
     val event = StateChangeEvent(eventId = eventId,
       username = Username.of("bob"),
       mailboxState = None,


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org