You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/02/09 04:29:42 UTC
[james-project] 14/33: JAMES-3491 Provide a basic
StateChangeListener
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit db48545bce46c66cdc8d8303e4e040269d64e99a
Author: LanKhuat <dl...@linagora.com>
AuthorDate: Wed Feb 3 09:52:26 2021 +0700
JAMES-3491 Provide a basic StateChangeListener
---
.../org/apache/james/jmap/change/StateChange.scala | 16 ++++--
...StateChange.scala => StateChangeListener.scala} | 58 ++++++++-----------
.../james/jmap/core/WebSocketTransport.scala | 6 +-
.../james/jmap/json/ResponseSerializer.scala | 11 ++++
.../scala/org/apache/james/jmap/json/package.scala | 2 +-
.../jmap/change/StateChangeListenerTest.scala | 67 ++++++++++++++++++++++
6 files changed, 118 insertions(+), 42 deletions(-)
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChange.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChange.scala
index d6c555c..7aa9bf1 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChange.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChange.scala
@@ -22,17 +22,21 @@ package org.apache.james.jmap.change
import org.apache.james.core.Username
import org.apache.james.events.Event
import org.apache.james.events.Event.EventId
-import org.apache.james.jmap.core.{AccountId, State}
+import org.apache.james.jmap.core.{AccountId, State, StateChange}
sealed trait TypeName {
def asMap(maybeState: Option[State]): Map[TypeName, State] =
maybeState.map(state => Map[TypeName, State](this -> state))
.getOrElse(Map())
-}
-case object MailboxTypeName extends TypeName
-case object EmailTypeName extends TypeName
-case class StateChange(changes: Map[AccountId, TypeState])
+ def asString(): String
+}
+case object MailboxTypeName extends TypeName {
+ override def asString(): String = "Mailbox"
+}
+case object EmailTypeName extends TypeName {
+ override def asString(): String = "Email"
+}
case class TypeState(changes: Map[TypeName, State])
@@ -51,7 +55,7 @@ case class StateChangeEvent(eventId: EventId,
override def getUsername: Username = username
- override def isNoop: Boolean = false
+ override def isNoop: Boolean = mailboxState.isEmpty && emailState.isEmpty
override def getEventId: EventId = eventId
}
\ No newline at end of file
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChange.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChangeListener.scala
similarity index 52%
copy from server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChange.scala
copy to server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChangeListener.scala
index d6c555c..c653a8a 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChange.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChangeListener.scala
@@ -19,39 +19,29 @@
package org.apache.james.jmap.change
-import org.apache.james.core.Username
-import org.apache.james.events.Event
-import org.apache.james.events.Event.EventId
-import org.apache.james.jmap.core.{AccountId, State}
+import java.nio.charset.StandardCharsets
-sealed trait TypeName {
- def asMap(maybeState: Option[State]): Map[TypeName, State] =
- maybeState.map(state => Map[TypeName, State](this -> state))
- .getOrElse(Map())
+import org.apache.james.events.Event
+import org.apache.james.events.EventListener.ReactiveEventListener
+import org.apache.james.jmap.json.ResponseSerializer
+import org.reactivestreams.Publisher
+import play.api.libs.json.Json
+import reactor.core.scala.publisher.SMono
+import reactor.core.scheduler.Schedulers
+import reactor.netty.http.websocket.WebsocketOutbound
+
+case class StateChangeListener(outbound: WebsocketOutbound) extends ReactiveEventListener {
+ override def reactiveEvent(event: Event): Publisher[Void] = event match {
+ case stateChangeEvent: StateChangeEvent =>
+ val stateChange = stateChangeEvent.asStateChange
+ val jsonString = Json.stringify(ResponseSerializer.serialize(stateChange))
+ SMono(outbound.sendString(SMono.just[String](jsonString), StandardCharsets.UTF_8))
+ .subscribeOn(Schedulers.elastic)
+ case _ => SMono.empty
+ }
+
+ override def isHandling(event: Event): Boolean = event match {
+ case _: StateChangeEvent => true
+ case _ => false
+ }
}
-case object MailboxTypeName extends TypeName
-case object EmailTypeName extends TypeName
-
-case class StateChange(changes: Map[AccountId, TypeState])
-
-case class TypeState(changes: Map[TypeName, State])
-
-case class StateChangeEvent(eventId: EventId,
- username: Username,
- mailboxState: Option[State],
- emailState: Option[State]) extends Event {
-
- def asStateChange: StateChange =
- StateChange(Map(AccountId.from(username).fold(
- failure => throw new IllegalArgumentException(failure),
- success => success) ->
- TypeState(
- MailboxTypeName.asMap(mailboxState) ++
- EmailTypeName.asMap(emailState))))
-
- override def getUsername: Username = username
-
- override def isNoop: Boolean = false
-
- override def getEventId: EventId = eventId
-}
\ No newline at end of file
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/WebSocketTransport.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/WebSocketTransport.scala
index f7df48b..b86cd07 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/WebSocketTransport.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/WebSocketTransport.scala
@@ -19,6 +19,8 @@
package org.apache.james.jmap.core
+import org.apache.james.jmap.change.TypeState
+
sealed trait WebSocketInboundMessage
sealed trait WebSocketOutboundMessage
@@ -29,4 +31,6 @@ case class WebSocketRequest(requestId: Option[RequestId], requestObject: Request
case class WebSocketResponse(requestId: Option[RequestId], responseObject: ResponseObject) extends WebSocketOutboundMessage
-case class WebSocketError(requestId: Option[RequestId], problemDetails: ProblemDetails) extends WebSocketOutboundMessage
\ No newline at end of file
+case class WebSocketError(requestId: Option[RequestId], problemDetails: ProblemDetails) extends WebSocketOutboundMessage
+
+case class StateChange(changes: Map[AccountId, TypeState]) extends WebSocketOutboundMessage
\ No newline at end of file
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/ResponseSerializer.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/ResponseSerializer.scala
index 2304565..fff3c11 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/ResponseSerializer.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/ResponseSerializer.scala
@@ -25,6 +25,7 @@ import java.net.URL
import eu.timepit.refined.refineV
import io.netty.handler.codec.http.HttpResponseStatus
import org.apache.james.core.Username
+import org.apache.james.jmap.change.{TypeName, TypeState}
import org.apache.james.jmap.core
import org.apache.james.jmap.core.CapabilityIdentifier.CapabilityIdentifier
import org.apache.james.jmap.core.Id.IdConstraint
@@ -202,6 +203,15 @@ object ResponseSerializer {
}
case _ => JsError("Expecting a JsObject to represent a webSocket inbound message")
}
+
+ private implicit val typeStateMapWrites: Writes[Map[TypeName, State]] = mapWrites[TypeName, State](_.asString(), stateWrites)
+ private implicit val typeStateWrites: Writes[TypeState] = Json.valueWrites[TypeState]
+ private implicit val changeWrites: OWrites[Map[AccountId, TypeState]] = mapWrites[AccountId, TypeState](_.id.value, typeStateWrites)
+ private implicit val stateChangeWrites: Writes[StateChange] = stateChange =>
+ JsObject(Map(
+ "@type" -> JsString("StateChange"),
+ "changed" -> changeWrites.writes(stateChange.changes)))
+
private implicit val webSocketResponseWrites: Writes[WebSocketResponse] = response => {
val apiResponseJson: JsObject = responseObjectFormat.writes(response.responseObject)
JsObject(Map(
@@ -217,6 +227,7 @@ object ResponseSerializer {
++ errorJson.value)
}
private implicit val webSocketOutboundWrites: Writes[WebSocketOutboundMessage] = {
+ case stateChange: StateChange => stateChangeWrites.writes(stateChange)
case response: WebSocketResponse => webSocketResponseWrites.writes(response)
case error: WebSocketError => webSocketErrorWrites.writes(error)
}
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/package.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/package.scala
index e3160cf..fb77977 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/package.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/package.scala
@@ -43,7 +43,7 @@ package object json {
case _ => JsError("Expecting mailboxId value to be a boolean")
}
- def mapWrites[K, V](keyWriter: K => String, valueWriter: Writes[V]): Writes[Map[K, V]] =
+ def mapWrites[K, V](keyWriter: K => String, valueWriter: Writes[V]): OWrites[Map[K, V]] =
(ids: Map[K, V]) => {
ids.foldLeft(JsObject.empty)((jsObject, kv) => {
val (key: K, value: V) = kv
diff --git a/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/StateChangeListenerTest.scala b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/StateChangeListenerTest.scala
new file mode 100644
index 0000000..ecd83a0
--- /dev/null
+++ b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/StateChangeListenerTest.scala
@@ -0,0 +1,67 @@
+/***************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.jmap.change
+
+import java.nio.charset.Charset
+
+import net.javacrumbs.jsonunit.assertj.JsonAssertions.assertThatJson
+import org.apache.james.core.Username
+import org.apache.james.events.Event.EventId
+import org.apache.james.jmap.core.State
+import org.junit.jupiter.api.Test
+import org.mockito.ArgumentCaptor
+import org.mockito.ArgumentMatchers.any
+import org.mockito.Mockito.{mock, verify, when}
+import org.reactivestreams.Publisher
+import reactor.core.scala.publisher.SMono
+import reactor.netty.NettyOutbound
+import reactor.netty.http.websocket.WebsocketOutbound
+
+class StateChangeListenerTest {
+ private val outbound: WebsocketOutbound = mock(classOf[WebsocketOutbound])
+
+ @Test
+ def reactiveEventShouldSendAnOutboundMessage(): Unit = {
+ val event = StateChangeEvent(eventId = EventId.of("6e0dd59d-660e-4d9b-b22f-0354479f47b4"),
+ username = Username.of("bob"),
+ mailboxState = Some(State.fromStringUnchecked("2f9f1b12-b35a-43e6-9af2-0106fb53a943")),
+ emailState = Some(State.fromStringUnchecked("2d9f1b12-b35a-43e6-9af2-0106fb53a943")))
+ val listener = StateChangeListener(outbound)
+ val nettyOutbound = mock(classOf[NettyOutbound])
+ when(outbound.sendString(any(), any())).thenReturn(nettyOutbound)
+
+ listener.reactiveEvent(event)
+
+ val captor: ArgumentCaptor[Publisher[String]] = ArgumentCaptor.forClass(classOf[Publisher[String]])
+ verify(outbound).sendString(captor.capture(), any(classOf[Charset]))
+ assertThatJson(SMono(captor.getValue).block()).isEqualTo(
+ """
+ |{
+ | "@type":"StateChange",
+ | "changed":{
+ | "81b637d8fcd2c6da6359e6963113a1170de795e4b725b84d1e0b4cfd9ec58ce9":{
+ | "Mailbox":"2f9f1b12-b35a-43e6-9af2-0106fb53a943",
+ | "Email":"2d9f1b12-b35a-43e6-9af2-0106fb53a943"
+ | }
+ | }
+ |}
+ |""".stripMargin)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org