You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/02/09 04:29:43 UTC

[james-project] 15/33: JAMES-3491 StateChangeListener should filter out unwanted events

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit f5954dbd53dca8f29f7c6b326269774e8a4be67e
Author: LanKhuat <dl...@linagora.com>
AuthorDate: Wed Feb 3 11:40:26 2021 +0700

    JAMES-3491 StateChangeListener should filter out unwanted events
    
    https://tools.ietf.org/html/rfc8887
    
    ```
    4.3.5.2.  Enabling Notifications
    
      [...]
    
       *  dataTypes: "String[]|null"
    
          A list of data type names (e.g., "Mailbox" or "Email") that the
          client is interested in.  A StateChange notification will only be
          sent if the data for one of these types changes.  Other types are
          omitted from the TypeState object.  If null, changes will be
          pushed for all supported data types.
    ```
    
    We implemented this by in-memory filtering upon notification generation
---
 .../org/apache/james/jmap/change/StateChange.scala |  9 ++++-
 .../james/jmap/change/StateChangeListener.scala    | 11 +++---
 .../james/jmap/core/WebSocketTransport.scala       | 12 +++++-
 .../jmap/change/StateChangeListenerTest.scala      | 46 +++++++++++++++++++++-
 4 files changed, 68 insertions(+), 10 deletions(-)

diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChange.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChange.scala
index 7aa9bf1..1b400be 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChange.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChange.scala
@@ -38,7 +38,14 @@ case object EmailTypeName extends TypeName {
   override def asString(): String = "Email"
 }
 
-case class TypeState(changes: Map[TypeName, State])
+case class TypeState(changes: Map[TypeName, State]) {
+
+  def filter(types: Set[TypeName]): Option[TypeState] = Option(changes.filter {
+    case (typeName, _) => types.contains(typeName)
+  })
+    .filter(_.nonEmpty)
+    .map(TypeState)
+}
 
 case class StateChangeEvent(eventId: EventId,
                             username: Username,
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChangeListener.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChangeListener.scala
index c653a8a..57fd4f2 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChangeListener.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChangeListener.scala
@@ -30,13 +30,14 @@ import reactor.core.scala.publisher.SMono
 import reactor.core.scheduler.Schedulers
 import reactor.netty.http.websocket.WebsocketOutbound
 
-case class StateChangeListener(outbound: WebsocketOutbound) extends ReactiveEventListener {
+case class StateChangeListener(types: Set[TypeName], outbound: WebsocketOutbound) extends ReactiveEventListener {
   override def reactiveEvent(event: Event): Publisher[Void] = event match {
     case stateChangeEvent: StateChangeEvent =>
-      val stateChange = stateChangeEvent.asStateChange
-      val jsonString = Json.stringify(ResponseSerializer.serialize(stateChange))
-      SMono(outbound.sendString(SMono.just[String](jsonString), StandardCharsets.UTF_8))
-        .subscribeOn(Schedulers.elastic)
+      val stateChange = stateChangeEvent.asStateChange.filter(types)
+      val jsonString = stateChange.map(ResponseSerializer.serialize).map(Json.stringify)
+      jsonString.map(json => SMono(outbound.sendString(SMono.just[String](json), StandardCharsets.UTF_8))
+        .subscribeOn(Schedulers.elastic))
+        .getOrElse(SMono.empty)
     case _ => SMono.empty
   }
 
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/WebSocketTransport.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/WebSocketTransport.scala
index b86cd07..3d1fd01 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/WebSocketTransport.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/WebSocketTransport.scala
@@ -19,7 +19,7 @@
 
 package org.apache.james.jmap.core
 
-import org.apache.james.jmap.change.TypeState
+import org.apache.james.jmap.change.{TypeName, TypeState}
 
 sealed trait WebSocketInboundMessage
 
@@ -33,4 +33,12 @@ case class WebSocketResponse(requestId: Option[RequestId], responseObject: Respo
 
 case class WebSocketError(requestId: Option[RequestId], problemDetails: ProblemDetails) extends WebSocketOutboundMessage
 
-case class StateChange(changes: Map[AccountId, TypeState]) extends WebSocketOutboundMessage
\ No newline at end of file
+case class StateChange(changes: Map[AccountId, TypeState]) extends WebSocketOutboundMessage {
+
+  def filter(types: Set[TypeName]): Option[StateChange] =
+    Option(changes.flatMap {
+      case (accountId, typeState) => typeState.filter(types).map(typeState => (accountId, typeState))
+    })
+    .filter(_.nonEmpty)
+    .map(StateChange)
+}
\ No newline at end of file
diff --git a/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/StateChangeListenerTest.scala b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/StateChangeListenerTest.scala
index ecd83a0..d208ca2 100644
--- a/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/StateChangeListenerTest.scala
+++ b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/StateChangeListenerTest.scala
@@ -28,7 +28,7 @@ import org.apache.james.jmap.core.State
 import org.junit.jupiter.api.Test
 import org.mockito.ArgumentCaptor
 import org.mockito.ArgumentMatchers.any
-import org.mockito.Mockito.{mock, verify, when}
+import org.mockito.Mockito.{mock, verify, verifyZeroInteractions, when}
 import org.reactivestreams.Publisher
 import reactor.core.scala.publisher.SMono
 import reactor.netty.NettyOutbound
@@ -43,7 +43,7 @@ class StateChangeListenerTest {
       username = Username.of("bob"),
       mailboxState = Some(State.fromStringUnchecked("2f9f1b12-b35a-43e6-9af2-0106fb53a943")),
       emailState = Some(State.fromStringUnchecked("2d9f1b12-b35a-43e6-9af2-0106fb53a943")))
-    val listener = StateChangeListener(outbound)
+    val listener = StateChangeListener(Set(MailboxTypeName, EmailTypeName), outbound)
     val nettyOutbound = mock(classOf[NettyOutbound])
     when(outbound.sendString(any(), any())).thenReturn(nettyOutbound)
 
@@ -64,4 +64,46 @@ class StateChangeListenerTest {
         |}
         |""".stripMargin)
   }
+
+  @Test
+  def reactiveEventShouldOmitUnwantedTypes(): Unit = {
+    val event = StateChangeEvent(eventId = EventId.of("6e0dd59d-660e-4d9b-b22f-0354479f47b4"),
+      username = Username.of("bob"),
+      mailboxState = Some(State.fromStringUnchecked("2f9f1b12-b35a-43e6-9af2-0106fb53a943")),
+      emailState = Some(State.fromStringUnchecked("2d9f1b12-b35a-43e6-9af2-0106fb53a943")))
+    val listener = StateChangeListener(Set(MailboxTypeName), outbound)
+    val nettyOutbound = mock(classOf[NettyOutbound])
+    when(outbound.sendString(any(), any())).thenReturn(nettyOutbound)
+
+    listener.reactiveEvent(event)
+
+    val captor: ArgumentCaptor[Publisher[String]] = ArgumentCaptor.forClass(classOf[Publisher[String]])
+    verify(outbound).sendString(captor.capture(), any(classOf[Charset]))
+    assertThatJson(SMono(captor.getValue).block()).isEqualTo(
+      """
+        |{
+        |  "@type":"StateChange",
+        |  "changed":{
+        |    "81b637d8fcd2c6da6359e6963113a1170de795e4b725b84d1e0b4cfd9ec58ce9":{
+        |      "Mailbox":"2f9f1b12-b35a-43e6-9af2-0106fb53a943"
+        |    }
+        |  }
+        |}
+        |""".stripMargin)
+  }
+
+  @Test
+  def reactiveEventShouldFilterOutUnwantedEvents(): Unit = {
+    val event = StateChangeEvent(eventId = EventId.of("6e0dd59d-660e-4d9b-b22f-0354479f47b4"),
+      username = Username.of("bob"),
+      mailboxState = None,
+      emailState = Some(State.fromStringUnchecked("2d9f1b12-b35a-43e6-9af2-0106fb53a943")))
+    val listener = StateChangeListener(Set(MailboxTypeName), outbound)
+    val nettyOutbound = mock(classOf[NettyOutbound])
+    when(outbound.sendString(any(), any())).thenReturn(nettyOutbound)
+
+    listener.reactiveEvent(event)
+
+    verifyZeroInteractions(outbound)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org