You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/02/09 04:29:48 UTC

[james-project] 20/33: JAMES-3491 Transport independant StateChangeListener

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit a661388fbf00f1649e694e00d617d74239a98259
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu Feb 4 16:48:08 2021 +0700

    JAMES-3491 Transport independant StateChangeListener
    
     - Allow writing tests without mocks
     - Enable reuse for other push mechanisms:
       - SSE (event-source)
       - Web hooks
---
 .../james/jmap/change/StateChangeListener.scala    | 20 ++---
 .../apache/james/jmap/routes/WebSocketRoutes.scala | 13 ++-
 .../jmap/change/StateChangeListenerTest.scala      | 96 +++++++++-------------
 3 files changed, 55 insertions(+), 74 deletions(-)

diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChangeListener.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChangeListener.scala
index 57fd4f2..a085a15 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChangeListener.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChangeListener.scala
@@ -19,25 +19,21 @@
 
 package org.apache.james.jmap.change
 
-import java.nio.charset.StandardCharsets
-
 import org.apache.james.events.Event
 import org.apache.james.events.EventListener.ReactiveEventListener
-import org.apache.james.jmap.json.ResponseSerializer
+import org.apache.james.jmap.core.StateChange
 import org.reactivestreams.Publisher
-import play.api.libs.json.Json
+import reactor.core.publisher.Sinks
+import reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST
 import reactor.core.scala.publisher.SMono
-import reactor.core.scheduler.Schedulers
-import reactor.netty.http.websocket.WebsocketOutbound
 
-case class StateChangeListener(types: Set[TypeName], outbound: WebsocketOutbound) extends ReactiveEventListener {
+case class StateChangeListener(types: Set[TypeName], sink: Sinks.Many[StateChange]) extends ReactiveEventListener {
   override def reactiveEvent(event: Event): Publisher[Void] = event match {
     case stateChangeEvent: StateChangeEvent =>
-      val stateChange = stateChangeEvent.asStateChange.filter(types)
-      val jsonString = stateChange.map(ResponseSerializer.serialize).map(Json.stringify)
-      jsonString.map(json => SMono(outbound.sendString(SMono.just[String](json), StandardCharsets.UTF_8))
-        .subscribeOn(Schedulers.elastic))
-        .getOrElse(SMono.empty)
+      SMono.fromCallable(() => {
+        val stateChange = stateChangeEvent.asStateChange.filter(types)
+        stateChange.foreach(next => sink.emitNext(next, FAIL_FAST))
+      }).asJava().`then`()
     case _ => SMono.empty
   }
 
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
index c97f4bf..f2b3ba8 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
@@ -30,14 +30,14 @@ import org.apache.james.events.{EventBus, Registration}
 import org.apache.james.jmap.HttpConstants.JSON_CONTENT_TYPE
 import org.apache.james.jmap.JMAPUrls.JMAP_WS
 import org.apache.james.jmap.change.{AccountIdRegistrationKey, StateChangeListener}
-import org.apache.james.jmap.core.{ProblemDetails, RequestId, WebSocketError, WebSocketOutboundMessage, WebSocketPushEnable, WebSocketRequest, WebSocketResponse}
+import org.apache.james.jmap.core.{ProblemDetails, RequestId, StateChange, WebSocketError, WebSocketOutboundMessage, WebSocketPushEnable, WebSocketRequest, WebSocketResponse}
 import org.apache.james.jmap.http.rfc8621.InjectionKeys
 import org.apache.james.jmap.http.{Authenticator, UserProvisioning}
 import org.apache.james.jmap.json.ResponseSerializer
 import org.apache.james.jmap.{Endpoint, JMAPRoute, JMAPRoutes, InjectionKeys => JMAPInjectionKeys}
 import org.apache.james.mailbox.MailboxSession
 import org.slf4j.{Logger, LoggerFactory}
-import reactor.core.publisher.Mono
+import reactor.core.publisher.{Mono, Sinks}
 import reactor.core.scala.publisher.{SFlux, SMono}
 import reactor.core.scheduler.Schedulers
 import reactor.netty.http.server.{HttpServerRequest, HttpServerResponse}
@@ -121,10 +121,17 @@ class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticato
               .flatMap(response => SMono(clientContext.outbound.sendString(SMono.just(response), StandardCharsets.UTF_8).`then`()))
               .`then`(SMono.just(clientContext))
           case pushEnable: WebSocketPushEnable =>
+            val sink: Sinks.Many[StateChange] = Sinks.many().unicast().onBackpressureError()
             SMono(eventBus.register(
-                StateChangeListener(pushEnable.dataTypes, clientContext.outbound),
+                StateChangeListener(pushEnable.dataTypes, sink),
                 AccountIdRegistrationKey.of(clientContext.session.getUser)))
               .map((registration: Registration) => ClientContext(clientContext.outbound, Some(registration), clientContext.session))
+              .doOnNext(context => sink.asFlux()
+                .map(ResponseSerializer.serialize)
+                .map(_.toString)
+                .flatMap(response => SMono(context.outbound.sendString(SMono.just(response), StandardCharsets.UTF_8).`then`()))
+                .subscribeOn(Schedulers.elastic())
+                .subscribe())
       })
 
   private def handleHttpHandshakeError(throwable: Throwable, response: HttpServerResponse): SMono[Void] =
diff --git a/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/StateChangeListenerTest.scala b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/StateChangeListenerTest.scala
index d208ca2..0addd3e 100644
--- a/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/StateChangeListenerTest.scala
+++ b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/StateChangeListenerTest.scala
@@ -19,91 +19,69 @@
 
 package org.apache.james.jmap.change
 
-import java.nio.charset.Charset
-
-import net.javacrumbs.jsonunit.assertj.JsonAssertions.assertThatJson
 import org.apache.james.core.Username
 import org.apache.james.events.Event.EventId
-import org.apache.james.jmap.core.State
+import org.apache.james.jmap.core.{AccountId, State, StateChange}
+import org.assertj.core.api.Assertions.assertThat
 import org.junit.jupiter.api.Test
-import org.mockito.ArgumentCaptor
-import org.mockito.ArgumentMatchers.any
-import org.mockito.Mockito.{mock, verify, verifyZeroInteractions, when}
-import org.reactivestreams.Publisher
+import reactor.core.publisher.Sinks
+import reactor.core.publisher.Sinks.EmitFailureHandler
 import reactor.core.scala.publisher.SMono
-import reactor.netty.NettyOutbound
-import reactor.netty.http.websocket.WebsocketOutbound
+import reactor.core.scheduler.Schedulers
 
 class StateChangeListenerTest {
-  private val outbound: WebsocketOutbound = mock(classOf[WebsocketOutbound])
+  private val mailboxState = State.fromStringUnchecked("2f9f1b12-b35a-43e6-9af2-0106fb53a943")
+  private val emailState = State.fromStringUnchecked("2d9f1b12-b35a-43e6-9af2-0106fb53a943")
+  private val eventId = EventId.of("6e0dd59d-660e-4d9b-b22f-0354479f47b4")
 
   @Test
   def reactiveEventShouldSendAnOutboundMessage(): Unit = {
-    val event = StateChangeEvent(eventId = EventId.of("6e0dd59d-660e-4d9b-b22f-0354479f47b4"),
+    val sink: Sinks.Many[StateChange] = Sinks.many().unicast().onBackpressureBuffer()
+    val event = StateChangeEvent(eventId = eventId,
       username = Username.of("bob"),
-      mailboxState = Some(State.fromStringUnchecked("2f9f1b12-b35a-43e6-9af2-0106fb53a943")),
-      emailState = Some(State.fromStringUnchecked("2d9f1b12-b35a-43e6-9af2-0106fb53a943")))
-    val listener = StateChangeListener(Set(MailboxTypeName, EmailTypeName), outbound)
-    val nettyOutbound = mock(classOf[NettyOutbound])
-    when(outbound.sendString(any(), any())).thenReturn(nettyOutbound)
+      mailboxState = Some(mailboxState),
+      emailState = Some(emailState))
+    val listener = StateChangeListener(Set(MailboxTypeName, EmailTypeName), sink)
 
-    listener.reactiveEvent(event)
+    SMono(listener.reactiveEvent(event)).subscribeOn(Schedulers.elastic()).block()
+    sink.emitComplete(EmitFailureHandler.FAIL_FAST)
 
-    val captor: ArgumentCaptor[Publisher[String]] = ArgumentCaptor.forClass(classOf[Publisher[String]])
-    verify(outbound).sendString(captor.capture(), any(classOf[Charset]))
-    assertThatJson(SMono(captor.getValue).block()).isEqualTo(
-      """
-        |{
-        |  "@type":"StateChange",
-        |  "changed":{
-        |    "81b637d8fcd2c6da6359e6963113a1170de795e4b725b84d1e0b4cfd9ec58ce9":{
-        |      "Mailbox":"2f9f1b12-b35a-43e6-9af2-0106fb53a943",
-        |      "Email":"2d9f1b12-b35a-43e6-9af2-0106fb53a943"
-        |    }
-        |  }
-        |}
-        |""".stripMargin)
+    assertThat(sink.asFlux().collectList().block())
+      .containsExactly(StateChange(Map(AccountId.from(Username.of("bob")).toOption.get  -> TypeState(Map(
+        MailboxTypeName -> mailboxState,
+        EmailTypeName -> emailState)))))
   }
 
   @Test
   def reactiveEventShouldOmitUnwantedTypes(): Unit = {
-    val event = StateChangeEvent(eventId = EventId.of("6e0dd59d-660e-4d9b-b22f-0354479f47b4"),
+    val sink: Sinks.Many[StateChange] = Sinks.many().unicast().onBackpressureBuffer()
+    val event = StateChangeEvent(eventId = eventId,
       username = Username.of("bob"),
-      mailboxState = Some(State.fromStringUnchecked("2f9f1b12-b35a-43e6-9af2-0106fb53a943")),
-      emailState = Some(State.fromStringUnchecked("2d9f1b12-b35a-43e6-9af2-0106fb53a943")))
-    val listener = StateChangeListener(Set(MailboxTypeName), outbound)
-    val nettyOutbound = mock(classOf[NettyOutbound])
-    when(outbound.sendString(any(), any())).thenReturn(nettyOutbound)
+      mailboxState = Some(mailboxState),
+      emailState = Some(emailState))
+    val listener = StateChangeListener(Set(MailboxTypeName), sink)
 
-    listener.reactiveEvent(event)
+    SMono(listener.reactiveEvent(event)).subscribeOn(Schedulers.elastic()).block()
+    sink.emitComplete(EmitFailureHandler.FAIL_FAST)
 
-    val captor: ArgumentCaptor[Publisher[String]] = ArgumentCaptor.forClass(classOf[Publisher[String]])
-    verify(outbound).sendString(captor.capture(), any(classOf[Charset]))
-    assertThatJson(SMono(captor.getValue).block()).isEqualTo(
-      """
-        |{
-        |  "@type":"StateChange",
-        |  "changed":{
-        |    "81b637d8fcd2c6da6359e6963113a1170de795e4b725b84d1e0b4cfd9ec58ce9":{
-        |      "Mailbox":"2f9f1b12-b35a-43e6-9af2-0106fb53a943"
-        |    }
-        |  }
-        |}
-        |""".stripMargin)
+    assertThat(sink.asFlux().collectList().block())
+      .containsExactly(StateChange(Map(AccountId.from(Username.of("bob")).toOption.get -> TypeState(Map(
+        MailboxTypeName -> mailboxState)))))
   }
 
   @Test
   def reactiveEventShouldFilterOutUnwantedEvents(): Unit = {
-    val event = StateChangeEvent(eventId = EventId.of("6e0dd59d-660e-4d9b-b22f-0354479f47b4"),
+    val sink: Sinks.Many[StateChange] = Sinks.many().unicast().onBackpressureBuffer()
+    val event = StateChangeEvent(eventId = eventId,
       username = Username.of("bob"),
       mailboxState = None,
-      emailState = Some(State.fromStringUnchecked("2d9f1b12-b35a-43e6-9af2-0106fb53a943")))
-    val listener = StateChangeListener(Set(MailboxTypeName), outbound)
-    val nettyOutbound = mock(classOf[NettyOutbound])
-    when(outbound.sendString(any(), any())).thenReturn(nettyOutbound)
+      emailState = Some(emailState))
+    val listener = StateChangeListener(Set(MailboxTypeName), sink)
 
-    listener.reactiveEvent(event)
+    SMono(listener.reactiveEvent(event)).subscribeOn(Schedulers.elastic()).block()
+    sink.emitComplete(EmitFailureHandler.FAIL_FAST)
 
-    verifyZeroInteractions(outbound)
+    assertThat(sink.asFlux().collectList().block())
+      .isEmpty()
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org