You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/02/09 04:29:42 UTC

[james-project] 14/33: JAMES-3491 Provide a basic StateChangeListener

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit db48545bce46c66cdc8d8303e4e040269d64e99a
Author: LanKhuat <dl...@linagora.com>
AuthorDate: Wed Feb 3 09:52:26 2021 +0700

    JAMES-3491 Provide a basic StateChangeListener
---
 .../org/apache/james/jmap/change/StateChange.scala | 16 ++++--
 ...StateChange.scala => StateChangeListener.scala} | 58 ++++++++-----------
 .../james/jmap/core/WebSocketTransport.scala       |  6 +-
 .../james/jmap/json/ResponseSerializer.scala       | 11 ++++
 .../scala/org/apache/james/jmap/json/package.scala |  2 +-
 .../jmap/change/StateChangeListenerTest.scala      | 67 ++++++++++++++++++++++
 6 files changed, 118 insertions(+), 42 deletions(-)

diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChange.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChange.scala
index d6c555c..7aa9bf1 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChange.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChange.scala
@@ -22,17 +22,21 @@ package org.apache.james.jmap.change
 import org.apache.james.core.Username
 import org.apache.james.events.Event
 import org.apache.james.events.Event.EventId
-import org.apache.james.jmap.core.{AccountId, State}
+import org.apache.james.jmap.core.{AccountId, State, StateChange}
 
 sealed trait TypeName {
   def asMap(maybeState: Option[State]): Map[TypeName, State] =
     maybeState.map(state => Map[TypeName, State](this -> state))
       .getOrElse(Map())
-}
-case object MailboxTypeName extends TypeName
-case object EmailTypeName extends TypeName
 
-case class StateChange(changes: Map[AccountId, TypeState])
+  def asString(): String
+}
+case object MailboxTypeName extends TypeName {
+  override def asString(): String = "Mailbox"
+}
+case object EmailTypeName extends TypeName {
+  override def asString(): String = "Email"
+}
 
 case class TypeState(changes: Map[TypeName, State])
 
@@ -51,7 +55,7 @@ case class StateChangeEvent(eventId: EventId,
 
   override def getUsername: Username = username
 
-  override def isNoop: Boolean = false
+  override def isNoop: Boolean = mailboxState.isEmpty && emailState.isEmpty
 
   override def getEventId: EventId = eventId
 }
\ No newline at end of file
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChange.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChangeListener.scala
similarity index 52%
copy from server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChange.scala
copy to server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChangeListener.scala
index d6c555c..c653a8a 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChange.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChangeListener.scala
@@ -19,39 +19,29 @@
 
 package org.apache.james.jmap.change
 
-import org.apache.james.core.Username
-import org.apache.james.events.Event
-import org.apache.james.events.Event.EventId
-import org.apache.james.jmap.core.{AccountId, State}
+import java.nio.charset.StandardCharsets
 
-sealed trait TypeName {
-  def asMap(maybeState: Option[State]): Map[TypeName, State] =
-    maybeState.map(state => Map[TypeName, State](this -> state))
-      .getOrElse(Map())
+import org.apache.james.events.Event
+import org.apache.james.events.EventListener.ReactiveEventListener
+import org.apache.james.jmap.json.ResponseSerializer
+import org.reactivestreams.Publisher
+import play.api.libs.json.Json
+import reactor.core.scala.publisher.SMono
+import reactor.core.scheduler.Schedulers
+import reactor.netty.http.websocket.WebsocketOutbound
+
+case class StateChangeListener(outbound: WebsocketOutbound) extends ReactiveEventListener {
+  override def reactiveEvent(event: Event): Publisher[Void] = event match {
+    case stateChangeEvent: StateChangeEvent =>
+      val stateChange = stateChangeEvent.asStateChange
+      val jsonString = Json.stringify(ResponseSerializer.serialize(stateChange))
+      SMono(outbound.sendString(SMono.just[String](jsonString), StandardCharsets.UTF_8))
+        .subscribeOn(Schedulers.elastic)
+    case _ => SMono.empty
+  }
+
+  override def isHandling(event: Event): Boolean = event match {
+    case _: StateChangeEvent => true
+    case _ => false
+  }
 }
-case object MailboxTypeName extends TypeName
-case object EmailTypeName extends TypeName
-
-case class StateChange(changes: Map[AccountId, TypeState])
-
-case class TypeState(changes: Map[TypeName, State])
-
-case class StateChangeEvent(eventId: EventId,
-                            username: Username,
-                            mailboxState: Option[State],
-                            emailState: Option[State]) extends Event {
-
-  def asStateChange: StateChange =
-    StateChange(Map(AccountId.from(username).fold(
-      failure => throw new IllegalArgumentException(failure),
-      success => success) ->
-      TypeState(
-        MailboxTypeName.asMap(mailboxState) ++
-          EmailTypeName.asMap(emailState))))
-
-  override def getUsername: Username = username
-
-  override def isNoop: Boolean = false
-
-  override def getEventId: EventId = eventId
-}
\ No newline at end of file
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/WebSocketTransport.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/WebSocketTransport.scala
index f7df48b..b86cd07 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/WebSocketTransport.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/WebSocketTransport.scala
@@ -19,6 +19,8 @@
 
 package org.apache.james.jmap.core
 
+import org.apache.james.jmap.change.TypeState
+
 sealed trait WebSocketInboundMessage
 
 sealed trait WebSocketOutboundMessage
@@ -29,4 +31,6 @@ case class WebSocketRequest(requestId: Option[RequestId], requestObject: Request
 
 case class WebSocketResponse(requestId: Option[RequestId], responseObject: ResponseObject) extends WebSocketOutboundMessage
 
-case class WebSocketError(requestId: Option[RequestId], problemDetails: ProblemDetails) extends WebSocketOutboundMessage
\ No newline at end of file
+case class WebSocketError(requestId: Option[RequestId], problemDetails: ProblemDetails) extends WebSocketOutboundMessage
+
+case class StateChange(changes: Map[AccountId, TypeState]) extends WebSocketOutboundMessage
\ No newline at end of file
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/ResponseSerializer.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/ResponseSerializer.scala
index 2304565..fff3c11 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/ResponseSerializer.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/ResponseSerializer.scala
@@ -25,6 +25,7 @@ import java.net.URL
 import eu.timepit.refined.refineV
 import io.netty.handler.codec.http.HttpResponseStatus
 import org.apache.james.core.Username
+import org.apache.james.jmap.change.{TypeName, TypeState}
 import org.apache.james.jmap.core
 import org.apache.james.jmap.core.CapabilityIdentifier.CapabilityIdentifier
 import org.apache.james.jmap.core.Id.IdConstraint
@@ -202,6 +203,15 @@ object ResponseSerializer {
       }
     case _ => JsError("Expecting a JsObject to represent a webSocket inbound message")
   }
+
+  private implicit val typeStateMapWrites: Writes[Map[TypeName, State]] = mapWrites[TypeName, State](_.asString(), stateWrites)
+  private implicit val typeStateWrites: Writes[TypeState] = Json.valueWrites[TypeState]
+  private implicit val changeWrites: OWrites[Map[AccountId, TypeState]] = mapWrites[AccountId, TypeState](_.id.value, typeStateWrites)
+  private implicit val stateChangeWrites: Writes[StateChange] = stateChange =>
+    JsObject(Map(
+      "@type" -> JsString("StateChange"),
+      "changed" -> changeWrites.writes(stateChange.changes)))
+
   private implicit val webSocketResponseWrites: Writes[WebSocketResponse] = response => {
     val apiResponseJson: JsObject = responseObjectFormat.writes(response.responseObject)
     JsObject(Map(
@@ -217,6 +227,7 @@ object ResponseSerializer {
       ++ errorJson.value)
   }
   private implicit val webSocketOutboundWrites: Writes[WebSocketOutboundMessage] = {
+    case stateChange: StateChange => stateChangeWrites.writes(stateChange)
     case response: WebSocketResponse => webSocketResponseWrites.writes(response)
     case error: WebSocketError => webSocketErrorWrites.writes(error)
   }
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/package.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/package.scala
index e3160cf..fb77977 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/package.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/package.scala
@@ -43,7 +43,7 @@ package object json {
     case _ => JsError("Expecting mailboxId value to be a boolean")
   }
 
-  def mapWrites[K, V](keyWriter: K => String, valueWriter: Writes[V]): Writes[Map[K, V]] =
+  def mapWrites[K, V](keyWriter: K => String, valueWriter: Writes[V]): OWrites[Map[K, V]] =
     (ids: Map[K, V]) => {
       ids.foldLeft(JsObject.empty)((jsObject, kv) => {
         val (key: K, value: V) = kv
diff --git a/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/StateChangeListenerTest.scala b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/StateChangeListenerTest.scala
new file mode 100644
index 0000000..ecd83a0
--- /dev/null
+++ b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/StateChangeListenerTest.scala
@@ -0,0 +1,67 @@
+/***************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.jmap.change
+
+import java.nio.charset.Charset
+
+import net.javacrumbs.jsonunit.assertj.JsonAssertions.assertThatJson
+import org.apache.james.core.Username
+import org.apache.james.events.Event.EventId
+import org.apache.james.jmap.core.State
+import org.junit.jupiter.api.Test
+import org.mockito.ArgumentCaptor
+import org.mockito.ArgumentMatchers.any
+import org.mockito.Mockito.{mock, verify, when}
+import org.reactivestreams.Publisher
+import reactor.core.scala.publisher.SMono
+import reactor.netty.NettyOutbound
+import reactor.netty.http.websocket.WebsocketOutbound
+
+class StateChangeListenerTest {
+  private val outbound: WebsocketOutbound = mock(classOf[WebsocketOutbound])
+
+  @Test
+  def reactiveEventShouldSendAnOutboundMessage(): Unit = {
+    val event = StateChangeEvent(eventId = EventId.of("6e0dd59d-660e-4d9b-b22f-0354479f47b4"),
+      username = Username.of("bob"),
+      mailboxState = Some(State.fromStringUnchecked("2f9f1b12-b35a-43e6-9af2-0106fb53a943")),
+      emailState = Some(State.fromStringUnchecked("2d9f1b12-b35a-43e6-9af2-0106fb53a943")))
+    val listener = StateChangeListener(outbound)
+    val nettyOutbound = mock(classOf[NettyOutbound])
+    when(outbound.sendString(any(), any())).thenReturn(nettyOutbound)
+
+    listener.reactiveEvent(event)
+
+    val captor: ArgumentCaptor[Publisher[String]] = ArgumentCaptor.forClass(classOf[Publisher[String]])
+    verify(outbound).sendString(captor.capture(), any(classOf[Charset]))
+    assertThatJson(SMono(captor.getValue).block()).isEqualTo(
+      """
+        |{
+        |  "@type":"StateChange",
+        |  "changed":{
+        |    "81b637d8fcd2c6da6359e6963113a1170de795e4b725b84d1e0b4cfd9ec58ce9":{
+        |      "Mailbox":"2f9f1b12-b35a-43e6-9af2-0106fb53a943",
+        |      "Email":"2d9f1b12-b35a-43e6-9af2-0106fb53a943"
+        |    }
+        |  }
+        |}
+        |""".stripMargin)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org