You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/02/09 04:29:49 UTC
[james-project] 21/33: JAMES-3491 Rework WebSocketRoutes
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit b6c02de8125acede33713d4178c16aceb66f5c22
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu Feb 4 23:46:29 2021 +0700
JAMES-3491 Rework WebSocketRoutes
- Use a Sink to gather all messages sent to the client
This enables a centralized handling, for errors, API messages, and push notification
- Revert buggy "functional friendly" un-registration
- Use Json.stringify
- Improve unregistration - get ride of closed registrations
---
.../james/jmap/change/StateChangeListener.scala | 21 ++++----
.../apache/james/jmap/routes/WebSocketRoutes.scala | 63 +++++++++++-----------
.../jmap/change/StateChangeListenerTest.scala | 8 +--
3 files changed, 47 insertions(+), 45 deletions(-)
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChangeListener.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChangeListener.scala
index a085a15..c83f642 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChangeListener.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChangeListener.scala
@@ -21,21 +21,22 @@ package org.apache.james.jmap.change
import org.apache.james.events.Event
import org.apache.james.events.EventListener.ReactiveEventListener
-import org.apache.james.jmap.core.StateChange
+import org.apache.james.jmap.core.WebSocketOutboundMessage
import org.reactivestreams.Publisher
import reactor.core.publisher.Sinks
import reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST
import reactor.core.scala.publisher.SMono
-case class StateChangeListener(types: Set[TypeName], sink: Sinks.Many[StateChange]) extends ReactiveEventListener {
- override def reactiveEvent(event: Event): Publisher[Void] = event match {
- case stateChangeEvent: StateChangeEvent =>
- SMono.fromCallable(() => {
- val stateChange = stateChangeEvent.asStateChange.filter(types)
- stateChange.foreach(next => sink.emitNext(next, FAIL_FAST))
- }).asJava().`then`()
- case _ => SMono.empty
- }
+case class StateChangeListener(types: Set[TypeName], sink: Sinks.Many[WebSocketOutboundMessage]) extends ReactiveEventListener {
+ override def reactiveEvent(event: Event): Publisher[Void] =
+ event match {
+ case stateChangeEvent: StateChangeEvent =>
+ SMono.fromCallable(() =>
+ stateChangeEvent.asStateChange.filter(types)
+ .foreach(next => sink.emitNext(next, FAIL_FAST)))
+ .asJava().`then`()
+ case _ => SMono.empty
+ }
override def isHandling(event: Event): Boolean = event match {
case _: StateChangeEvent => true
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
index f2b3ba8..bd038a5 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
@@ -20,6 +20,7 @@
package org.apache.james.jmap.routes
import java.nio.charset.StandardCharsets
+import java.util.concurrent.atomic.AtomicReference
import java.util.stream
import io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE
@@ -30,13 +31,15 @@ import org.apache.james.events.{EventBus, Registration}
import org.apache.james.jmap.HttpConstants.JSON_CONTENT_TYPE
import org.apache.james.jmap.JMAPUrls.JMAP_WS
import org.apache.james.jmap.change.{AccountIdRegistrationKey, StateChangeListener}
-import org.apache.james.jmap.core.{ProblemDetails, RequestId, StateChange, WebSocketError, WebSocketOutboundMessage, WebSocketPushEnable, WebSocketRequest, WebSocketResponse}
+import org.apache.james.jmap.core.{ProblemDetails, RequestId, WebSocketError, WebSocketOutboundMessage, WebSocketPushEnable, WebSocketRequest, WebSocketResponse}
import org.apache.james.jmap.http.rfc8621.InjectionKeys
import org.apache.james.jmap.http.{Authenticator, UserProvisioning}
import org.apache.james.jmap.json.ResponseSerializer
import org.apache.james.jmap.{Endpoint, JMAPRoute, JMAPRoutes, InjectionKeys => JMAPInjectionKeys}
import org.apache.james.mailbox.MailboxSession
import org.slf4j.{Logger, LoggerFactory}
+import play.api.libs.json.Json
+import reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST
import reactor.core.publisher.{Mono, Sinks}
import reactor.core.scala.publisher.{SFlux, SMono}
import reactor.core.scheduler.Schedulers
@@ -47,16 +50,15 @@ object WebSocketRoutes {
val LOGGER: Logger = LoggerFactory.getLogger(classOf[WebSocketRoutes])
}
-case class ClientContext(outbound: WebsocketOutbound, pushRegistration: Option[Registration], session: MailboxSession) {
- def latest(clientContext: ClientContext): ClientContext = {
- clean
- clientContext
- }
+case class ClientContext(outbound: Sinks.Many[WebSocketOutboundMessage], pushRegistration: AtomicReference[Registration], session: MailboxSession) {
+ def withRegistration(registration: Registration): Unit = withRegistration(Some(registration))
- def clean: ClientContext = {
- pushRegistration.foreach(_.unregister())
- ClientContext(outbound, None, session)
- }
+ def clean(): Unit = withRegistration(None)
+
+ def withRegistration(registration: Option[Registration]): Unit = Option(pushRegistration.getAndSet(registration.orNull))
+ .foreach(oldRegistration => SMono.fromCallable(() => oldRegistration.unregister())
+ .subscribeOn(Schedulers.elastic())
+ .subscribe())
}
class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticator: Authenticator,
@@ -86,7 +88,17 @@ class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticato
}
private def handleWebSocketConnection(session: MailboxSession)(in: WebsocketInbound, out: WebsocketOutbound): Mono[Void] = {
- val context = ClientContext(out, None, session)
+ val sink: Sinks.Many[WebSocketOutboundMessage] = Sinks.many().unicast().onBackpressureBuffer()
+
+ out.sendString(
+ sink.asFlux()
+ .map(ResponseSerializer.serialize)
+ .map(Json.stringify),
+ StandardCharsets.UTF_8).`then`
+ .subscribeOn(Schedulers.elastic())
+ .subscribe()
+
+ val context = ClientContext(sink, new AtomicReference[Registration](), session)
SFlux[WebSocketFrame](in.aggregateFrames()
.receiveFrames())
.map(frame => {
@@ -95,20 +107,17 @@ class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticato
new String(bytes, StandardCharsets.UTF_8)
})
.flatMap(message => handleClientMessages(context)(message))
- .reduce((c1: ClientContext, c2: ClientContext) => c1.latest(c2))
- .map[ClientContext](context => context.clean)
+ .doOnTerminate(context.clean)
.`then`()
.asJava()
.`then`()
}
- private def handleClientMessages(clientContext: ClientContext)(message: String): SMono[ClientContext] =
+ private def handleClientMessages(clientContext: ClientContext)(message: String): SMono[Unit] =
ResponseSerializer.deserializeWebSocketInboundMessage(message)
.fold(invalid => {
- val error = ResponseSerializer.serialize(asError(None)(new IllegalArgumentException(invalid.toString())))
- SMono(clientContext.outbound.sendString(SMono.just(error.toString()), StandardCharsets.UTF_8)
- .`then`())
- .`then`(SMono.just(clientContext))
+ val error = asError(None)(new IllegalArgumentException(invalid.toString()))
+ SMono.fromCallable(() => clientContext.outbound.emitNext(error, FAIL_FAST))
}, {
case request: WebSocketRequest =>
jmapApi.process(request.requestObject, clientContext.session)
@@ -116,22 +125,14 @@ class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticato
.onErrorResume(e => SMono.just(asError(request.requestId)(e)))
.subscribeOn(Schedulers.elastic)
.onErrorResume(e => SMono.just[WebSocketOutboundMessage](asError(None)(e)))
- .map(ResponseSerializer.serialize)
- .map(_.toString)
- .flatMap(response => SMono(clientContext.outbound.sendString(SMono.just(response), StandardCharsets.UTF_8).`then`()))
- .`then`(SMono.just(clientContext))
+ .doOnNext(next => clientContext.outbound.emitNext(next, FAIL_FAST))
+ .`then`()
case pushEnable: WebSocketPushEnable =>
- val sink: Sinks.Many[StateChange] = Sinks.many().unicast().onBackpressureError()
SMono(eventBus.register(
- StateChangeListener(pushEnable.dataTypes, sink),
+ StateChangeListener(pushEnable.dataTypes, clientContext.outbound),
AccountIdRegistrationKey.of(clientContext.session.getUser)))
- .map((registration: Registration) => ClientContext(clientContext.outbound, Some(registration), clientContext.session))
- .doOnNext(context => sink.asFlux()
- .map(ResponseSerializer.serialize)
- .map(_.toString)
- .flatMap(response => SMono(context.outbound.sendString(SMono.just(response), StandardCharsets.UTF_8).`then`()))
- .subscribeOn(Schedulers.elastic())
- .subscribe())
+ .doOnNext(newRegistration => clientContext.withRegistration(newRegistration))
+ .`then`()
})
private def handleHttpHandshakeError(throwable: Throwable, response: HttpServerResponse): SMono[Void] =
diff --git a/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/StateChangeListenerTest.scala b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/StateChangeListenerTest.scala
index 0addd3e..efa5775 100644
--- a/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/StateChangeListenerTest.scala
+++ b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/StateChangeListenerTest.scala
@@ -21,7 +21,7 @@ package org.apache.james.jmap.change
import org.apache.james.core.Username
import org.apache.james.events.Event.EventId
-import org.apache.james.jmap.core.{AccountId, State, StateChange}
+import org.apache.james.jmap.core.{AccountId, State, StateChange, WebSocketOutboundMessage}
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.Test
import reactor.core.publisher.Sinks
@@ -36,7 +36,7 @@ class StateChangeListenerTest {
@Test
def reactiveEventShouldSendAnOutboundMessage(): Unit = {
- val sink: Sinks.Many[StateChange] = Sinks.many().unicast().onBackpressureBuffer()
+ val sink: Sinks.Many[WebSocketOutboundMessage] = Sinks.many().unicast().onBackpressureBuffer()
val event = StateChangeEvent(eventId = eventId,
username = Username.of("bob"),
mailboxState = Some(mailboxState),
@@ -54,7 +54,7 @@ class StateChangeListenerTest {
@Test
def reactiveEventShouldOmitUnwantedTypes(): Unit = {
- val sink: Sinks.Many[StateChange] = Sinks.many().unicast().onBackpressureBuffer()
+ val sink: Sinks.Many[WebSocketOutboundMessage] = Sinks.many().unicast().onBackpressureBuffer()
val event = StateChangeEvent(eventId = eventId,
username = Username.of("bob"),
mailboxState = Some(mailboxState),
@@ -71,7 +71,7 @@ class StateChangeListenerTest {
@Test
def reactiveEventShouldFilterOutUnwantedEvents(): Unit = {
- val sink: Sinks.Many[StateChange] = Sinks.many().unicast().onBackpressureBuffer()
+ val sink: Sinks.Many[WebSocketOutboundMessage] = Sinks.many().unicast().onBackpressureBuffer()
val event = StateChangeEvent(eventId = eventId,
username = Username.of("bob"),
mailboxState = None,
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org