You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/02/14 06:54:04 UTC

[james-project] 14/18: JAMES-3491 WebSocket PUSH cancellation

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 9569e94c64123c875932bb5aa68b08987537b4ac
Author: LanKhuat <dl...@linagora.com>
AuthorDate: Fri Feb 5 10:57:12 2021 +0700

    JAMES-3491 WebSocket PUSH cancellation
---
 .../jmap/rfc8621/contract/WebSocketContract.scala  | 102 +++++++++++++++++++++
 .../james/jmap/core/WebSocketTransport.scala       |   3 +-
 .../james/jmap/json/ResponseSerializer.scala       |   1 +
 .../apache/james/jmap/routes/WebSocketRoutes.scala |   3 +-
 4 files changed, 107 insertions(+), 2 deletions(-)

diff --git a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketContract.scala b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketContract.scala
index b8f443a..b80740d 100644
--- a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketContract.scala
+++ b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketContract.scala
@@ -35,6 +35,8 @@ import org.apache.james.utils.DataProbeImpl
 import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
 import org.junit.jupiter.api.{BeforeEach, Test, Timeout}
 import play.api.libs.json.{JsString, Json}
+import reactor.core.scala.publisher.SMono
+import reactor.core.scheduler.Schedulers
 import sttp.capabilities.WebSockets
 import sttp.client3.monad.IdMonad
 import sttp.client3.okhttp.OkHttpSyncBackend
@@ -966,6 +968,106 @@ trait WebSocketContract {
       .contains(mailboxStateChange, emailStateChange)
   }
 
+  @Test
+  @Timeout(180)
+  def pushCancelRequestsShouldDisableNotification(server: GuiceJamesServer): Unit = {
+    val bobPath = MailboxPath.inbox(BOB)
+    val accountId: AccountId = AccountId.fromUsername(BOB)
+    val mailboxId = server.getProbe(classOf[MailboxProbeImpl]).createMailbox(bobPath)
+
+    Thread.sleep(100)
+
+    val response: Either[String, List[String]] =
+      authenticatedRequest(server)
+        .response(asWebSocket[Identity, List[String]] {
+          ws =>
+            ws.send(WebSocketFrame.text(
+              """{
+                |  "@type": "WebSocketPushEnable",
+                |  "dataTypes": ["Mailbox", "Email"]
+                |}""".stripMargin))
+
+            Thread.sleep(100)
+
+                ws.send(WebSocketFrame.text(
+              """{
+                |  "@type": "WebSocketPushDisable"
+                |}""".stripMargin))
+
+            Thread.sleep(100)
+
+            ws.send(WebSocketFrame.text(
+              s"""{
+                 |  "@type": "Request",
+                 |  "requestId": "req-36",
+                 |  "using": ["urn:ietf:params:jmap:core", "urn:ietf:params:jmap:mail"],
+                 |  "methodCalls": [
+                 |    ["Email/set", {
+                 |      "accountId": "$ACCOUNT_ID",
+                 |      "create": {
+                 |        "aaaaaa":{
+                 |          "mailboxIds": {
+                 |             "${mailboxId.serialize}": true
+                 |          }
+                 |        }
+                 |      }
+                 |    }, "c1"]]
+                 |}""".stripMargin))
+
+            val response = ws.receive()
+              .map { case t: Text =>
+                t.payload
+              }
+
+            val maybeNotification: String = SMono.fromCallable(() =>
+            ws.receive()
+              .map { case t: Text =>
+                t.payload
+              })
+              .timeout(scala.concurrent.duration.Duration.fromNanos(100000000), Some(SMono.just("No notification received")))
+              .subscribeOn(Schedulers.elastic())
+              .block()
+
+          List(response, maybeNotification)
+        })
+        .send(backend)
+        .body
+
+      Thread.sleep(100)
+      val jmapGuiceProbe: JmapGuiceProbe = server.getProbe(classOf[JmapGuiceProbe])
+      val emailState: String = jmapGuiceProbe.getLatestEmailState(accountId).getValue.toString
+      val mailboxState: String = jmapGuiceProbe.getLatestMailboxState(accountId).getValue.toString
+
+      val mailboxStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Mailbox":"$mailboxState"}}}"""
+      val emailStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"$emailState"}}}"""
+
+      assertThat(response.toOption.get.asJava)
+        .hasSize(2) // Email create response + no notification message
+        .contains("No notification received")
+        .doesNotContain(mailboxStateChange, emailStateChange)
+  }
+
+  @Test
+  @Timeout(180)
+  def pushCancelRequestAsFirstMessageShouldBeProcessedNormally(server: GuiceJamesServer): Unit = {
+    Thread.sleep(100)
+
+    authenticatedRequest(server)
+      .response(asWebSocket[Identity, Unit] {
+        ws =>
+          ws.send(WebSocketFrame.text(
+            """{
+              |  "@type": "WebSocketPushDisable"
+              |}""".stripMargin))
+
+          Thread.sleep(100)
+
+          assertThat(ws.isOpen()).isTrue
+      })
+      .send(backend)
+      .body
+  }
+
   private def authenticatedRequest(server: GuiceJamesServer): RequestT[Identity, Either[String, String], Any] = {
     val port = server.getProbe(classOf[JmapGuiceProbe])
       .getJmapPort
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/WebSocketTransport.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/WebSocketTransport.scala
index 90ca138..4c25d4a 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/WebSocketTransport.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/WebSocketTransport.scala
@@ -43,4 +43,5 @@ case class StateChange(changes: Map[AccountId, TypeState]) extends WebSocketOutb
     .map(StateChange)
 }
 
-case class WebSocketPushEnable(dataTypes: Option[Set[TypeName]]) extends WebSocketInboundMessage
\ No newline at end of file
+case class WebSocketPushEnable(dataTypes: Option[Set[TypeName]]) extends WebSocketInboundMessage
+case object WebSocketPushDisable extends WebSocketInboundMessage
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/ResponseSerializer.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/ResponseSerializer.scala
index 042d4d8..46aa195 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/ResponseSerializer.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/ResponseSerializer.scala
@@ -202,6 +202,7 @@ object ResponseSerializer {
       json.value.get("@type") match {
         case Some(JsString("Request")) => webSocketRequestReads.reads(json)
         case Some(JsString("WebSocketPushEnable")) => webSocketPushEnableReads.reads(json)
+        case Some(JsString("WebSocketPushDisable")) => JsSuccess(WebSocketPushDisable)
         case Some(JsString(unknownType)) => JsError(s"Unknown @type field on a webSocket inbound message: $unknownType")
         case Some(invalidType) => JsError(s"Invalid @type field on a webSocket inbound message: expecting a JsString, got $invalidType")
         case None => JsError(s"Missing @type field on a webSocket inbound message")
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
index d6d2c40..c45ef45 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
@@ -31,7 +31,7 @@ import org.apache.james.events.{EventBus, Registration}
 import org.apache.james.jmap.HttpConstants.JSON_CONTENT_TYPE
 import org.apache.james.jmap.JMAPUrls.JMAP_WS
 import org.apache.james.jmap.change.{AccountIdRegistrationKey, StateChangeListener, TypeName}
-import org.apache.james.jmap.core.{ProblemDetails, RequestId, WebSocketError, WebSocketOutboundMessage, WebSocketPushEnable, WebSocketRequest, WebSocketResponse}
+import org.apache.james.jmap.core.{ProblemDetails, RequestId, WebSocketError, WebSocketOutboundMessage, WebSocketPushDisable, WebSocketPushEnable, WebSocketRequest, WebSocketResponse}
 import org.apache.james.jmap.http.rfc8621.InjectionKeys
 import org.apache.james.jmap.http.{Authenticator, UserProvisioning}
 import org.apache.james.jmap.json.ResponseSerializer
@@ -133,6 +133,7 @@ class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticato
                 AccountIdRegistrationKey.of(clientContext.session.getUser)))
               .doOnNext(newRegistration => clientContext.withRegistration(newRegistration))
               .`then`()
+          case WebSocketPushDisable => SMono.fromCallable(() => clientContext.clean())
       })
 
   private def handleHttpHandshakeError(throwable: Throwable, response: HttpServerResponse): SMono[Void] =


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org