You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/02/09 04:29:46 UTC
[james-project] 18/33: JAMES-3491 Register state changes upon
WebSocketPushEnable reception
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 737e2c04c28771b10c5094d8c6b15adf1d884aa3
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Fri Feb 5 10:10:56 2021 +0700
JAMES-3491 Register state changes upon WebSocketPushEnable reception
---
.../modules/protocols/JmapEventBusModule.java | 19 ++++++++
.../apache/james/jmap/api/change/JmapChange.java | 4 +-
.../jmap/change/AccountIdRegistrationKey.scala | 5 ++
.../james/jmap/change/MailboxChangeListener.scala | 8 ++--
.../org/apache/james/jmap/change/StateChange.scala | 4 +-
.../james/jmap/core/WebSocketTransport.scala | 4 +-
.../james/jmap/json/ResponseSerializer.scala | 6 +--
.../apache/james/jmap/routes/WebSocketRoutes.scala | 53 ++++++++++++++++------
8 files changed, 79 insertions(+), 24 deletions(-)
diff --git a/server/container/guice/protocols/jmap/src/main/java/org/apache/james/modules/protocols/JmapEventBusModule.java b/server/container/guice/protocols/jmap/src/main/java/org/apache/james/modules/protocols/JmapEventBusModule.java
index 9faba9d..0fafb26 100644
--- a/server/container/guice/protocols/jmap/src/main/java/org/apache/james/modules/protocols/JmapEventBusModule.java
+++ b/server/container/guice/protocols/jmap/src/main/java/org/apache/james/modules/protocols/JmapEventBusModule.java
@@ -1,3 +1,22 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
package org.apache.james.modules.protocols;
import org.apache.james.events.EventBus;
diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/JmapChange.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/JmapChange.java
index b7a1a2e..3767de1 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/JmapChange.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/JmapChange.java
@@ -19,6 +19,8 @@
package org.apache.james.jmap.api.change;
-public interface JmapChange {
+import org.apache.james.jmap.api.model.AccountId;
+public interface JmapChange {
+ AccountId getAccountId();
}
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/AccountIdRegistrationKey.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/AccountIdRegistrationKey.scala
index b2b969b..5bf3eba 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/AccountIdRegistrationKey.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/AccountIdRegistrationKey.scala
@@ -19,6 +19,7 @@
package org.apache.james.jmap.change
+import org.apache.james.core.Username
import org.apache.james.events.RegistrationKey
import org.apache.james.jmap.api.model.AccountId
@@ -28,6 +29,10 @@ case class Factory() extends RegistrationKey.Factory {
override def fromString(asString: String): RegistrationKey = AccountIdRegistrationKey(AccountId.fromString(asString))
}
+object AccountIdRegistrationKey {
+ def of(username: Username): AccountIdRegistrationKey = AccountIdRegistrationKey(AccountId.fromUsername(username))
+}
+
case class AccountIdRegistrationKey(accountId: AccountId) extends RegistrationKey {
override def asString(): String = accountId.getIdentifier
}
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala
index 627941f..6e57c4f 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala
@@ -25,7 +25,7 @@ import javax.inject.{Inject, Named}
import org.apache.james.core.Username
import org.apache.james.events.Event.EventId
import org.apache.james.events.EventListener.ReactiveGroupEventListener
-import org.apache.james.events.{Event, EventBus, Group, RegistrationKey}
+import org.apache.james.events.{Event, EventBus, Group}
import org.apache.james.jmap.InjectionKeys
import org.apache.james.jmap.api.change.{EmailChange, EmailChangeRepository, JmapChange, MailboxChange, MailboxChangeRepository}
import org.apache.james.jmap.api.model.AccountId
@@ -101,7 +101,7 @@ case class MailboxChangeListener @Inject() (@Named(InjectionKeys.JMAP) eventBus:
SMono(jmapChange match {
case mailboxChange: MailboxChange => mailboxChangeRepository.save(mailboxChange)
case emailChange: EmailChange => emailChangeRepository.save(emailChange)
- }).`then`(SMono(eventBus.dispatch(toStateChangeEvent(jmapChange), Set[RegistrationKey]().asJava)))
+ }).`then`(SMono(eventBus.dispatch(toStateChangeEvent(jmapChange), AccountIdRegistrationKey(jmapChange.getAccountId))))
private def getSharees(mailboxId: MailboxId, username: Username): List[AccountId] = {
@@ -131,7 +131,7 @@ case class MailboxChangeListener @Inject() (@Named(InjectionKeys.JMAP) eventBus:
case mailboxChange: MailboxChange => StateChangeEvent(
eventId = EventId.random(),
username = Username.of(mailboxChange.getAccountId.getIdentifier),
- emailState = Some(State.fromJava(mailboxChange.getState)),
- mailboxState = None)
+ emailState = None,
+ mailboxState = Some(State.fromJava(mailboxChange.getState)))
}
}
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChange.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChange.scala
index 1b400be..312aa72 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChange.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChange.scala
@@ -32,10 +32,10 @@ sealed trait TypeName {
def asString(): String
}
case object MailboxTypeName extends TypeName {
- override def asString(): String = "Mailbox"
+ override val asString: String = "Mailbox"
}
case object EmailTypeName extends TypeName {
- override def asString(): String = "Email"
+ override val asString: String = "Email"
}
case class TypeState(changes: Map[TypeName, State]) {
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/WebSocketTransport.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/WebSocketTransport.scala
index 3d1fd01..dd40f12 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/WebSocketTransport.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/WebSocketTransport.scala
@@ -41,4 +41,6 @@ case class StateChange(changes: Map[AccountId, TypeState]) extends WebSocketOutb
})
.filter(_.nonEmpty)
.map(StateChange)
-}
\ No newline at end of file
+}
+
+case class WebSocketPushEnable(dataTypes: Set[TypeName]) extends WebSocketInboundMessage
\ No newline at end of file
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/ResponseSerializer.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/ResponseSerializer.scala
index fff3c11..b1454c6 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/ResponseSerializer.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/ResponseSerializer.scala
@@ -25,7 +25,7 @@ import java.net.URL
import eu.timepit.refined.refineV
import io.netty.handler.codec.http.HttpResponseStatus
import org.apache.james.core.Username
-import org.apache.james.jmap.change.{TypeName, TypeState}
+import org.apache.james.jmap.change.{EmailTypeName, MailboxTypeName, TypeName, TypeState}
import org.apache.james.jmap.core
import org.apache.james.jmap.core.CapabilityIdentifier.CapabilityIdentifier
import org.apache.james.jmap.core.Id.IdConstraint
@@ -187,8 +187,8 @@ object ResponseSerializer {
case _ => JsError("Expecting a JsObject to represent a webSocket inbound request")
}
private implicit val typeNameReads: Reads[TypeName] = {
- case JsString(value) if value.equals(MailboxTypeName.asString()) => JsSuccess(MailboxTypeName)
- case JsString(value) if value.equals(EmailTypeName.asString()) => JsSuccess(EmailTypeName)
+ case JsString(MailboxTypeName.asString) => JsSuccess(MailboxTypeName)
+ case JsString(EmailTypeName.asString) => JsSuccess(EmailTypeName)
case _ => JsError("Expecting a JsString as typeName")
}
private implicit val webSocketPushEnableReads: Reads[WebSocketPushEnable] = Json.reads[WebSocketPushEnable]
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
index 4664eba..c97f4bf 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
@@ -26,13 +26,15 @@ import io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE
import io.netty.handler.codec.http.HttpMethod
import io.netty.handler.codec.http.websocketx.WebSocketFrame
import javax.inject.{Inject, Named}
+import org.apache.james.events.{EventBus, Registration}
import org.apache.james.jmap.HttpConstants.JSON_CONTENT_TYPE
import org.apache.james.jmap.JMAPUrls.JMAP_WS
-import org.apache.james.jmap.core.{ProblemDetails, RequestId, WebSocketError, WebSocketOutboundMessage, WebSocketRequest, WebSocketResponse}
+import org.apache.james.jmap.change.{AccountIdRegistrationKey, StateChangeListener}
+import org.apache.james.jmap.core.{ProblemDetails, RequestId, WebSocketError, WebSocketOutboundMessage, WebSocketPushEnable, WebSocketRequest, WebSocketResponse}
import org.apache.james.jmap.http.rfc8621.InjectionKeys
import org.apache.james.jmap.http.{Authenticator, UserProvisioning}
import org.apache.james.jmap.json.ResponseSerializer
-import org.apache.james.jmap.{Endpoint, JMAPRoute, JMAPRoutes}
+import org.apache.james.jmap.{Endpoint, JMAPRoute, JMAPRoutes, InjectionKeys => JMAPInjectionKeys}
import org.apache.james.mailbox.MailboxSession
import org.slf4j.{Logger, LoggerFactory}
import reactor.core.publisher.Mono
@@ -45,8 +47,21 @@ object WebSocketRoutes {
val LOGGER: Logger = LoggerFactory.getLogger(classOf[WebSocketRoutes])
}
+case class ClientContext(outbound: WebsocketOutbound, pushRegistration: Option[Registration], session: MailboxSession) {
+ def latest(clientContext: ClientContext): ClientContext = {
+ clean
+ clientContext
+ }
+
+ def clean: ClientContext = {
+ pushRegistration.foreach(_.unregister())
+ ClientContext(outbound, None, session)
+ }
+}
+
class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticator: Authenticator,
userProvisioner: UserProvisioning,
+ @Named(JMAPInjectionKeys.JMAP) eventBus: EventBus,
jmapApi: JMAPApi) extends JMAPRoutes {
override def routes(): stream.Stream[JMAPRoute] = stream.Stream.of(
@@ -70,7 +85,8 @@ class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticato
.`then`()
}
- private def handleWebSocketConnection(session: MailboxSession)(in: WebsocketInbound, out: WebsocketOutbound): Mono[Void] =
+ private def handleWebSocketConnection(session: MailboxSession)(in: WebsocketInbound, out: WebsocketOutbound): Mono[Void] = {
+ val context = ClientContext(out, None, session)
SFlux[WebSocketFrame](in.aggregateFrames()
.receiveFrames())
.map(frame => {
@@ -78,27 +94,38 @@ class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticato
frame.content().readBytes(bytes)
new String(bytes, StandardCharsets.UTF_8)
})
- .flatMap(handleClientMessages(session))
- .onErrorResume(e => SMono.just[WebSocketOutboundMessage](asError(None)(e)))
- .map(ResponseSerializer.serialize)
- .map(_.toString)
- .flatMap(response => out.sendString(SMono.just(response), StandardCharsets.UTF_8))
+ .flatMap(message => handleClientMessages(context)(message))
+ .reduce((c1: ClientContext, c2: ClientContext) => c1.latest(c2))
+ .map[ClientContext](context => context.clean)
.`then`()
.asJava()
.`then`()
+ }
- private def handleClientMessages(session: MailboxSession)(message: String): SMono[WebSocketOutboundMessage] =
+ private def handleClientMessages(clientContext: ClientContext)(message: String): SMono[ClientContext] =
ResponseSerializer.deserializeWebSocketInboundMessage(message)
.fold(invalid => {
- val error = asError(None)(new IllegalArgumentException(invalid.toString()))
- SMono.just[WebSocketOutboundMessage](error)
+ val error = ResponseSerializer.serialize(asError(None)(new IllegalArgumentException(invalid.toString())))
+ SMono(clientContext.outbound.sendString(SMono.just(error.toString()), StandardCharsets.UTF_8)
+ .`then`())
+ .`then`(SMono.just(clientContext))
}, {
case request: WebSocketRequest =>
- jmapApi.process(request.requestObject, session)
+ jmapApi.process(request.requestObject, clientContext.session)
.map[WebSocketOutboundMessage](WebSocketResponse(request.requestId, _))
.onErrorResume(e => SMono.just(asError(request.requestId)(e)))
.subscribeOn(Schedulers.elastic)
- })
+ .onErrorResume(e => SMono.just[WebSocketOutboundMessage](asError(None)(e)))
+ .map(ResponseSerializer.serialize)
+ .map(_.toString)
+ .flatMap(response => SMono(clientContext.outbound.sendString(SMono.just(response), StandardCharsets.UTF_8).`then`()))
+ .`then`(SMono.just(clientContext))
+ case pushEnable: WebSocketPushEnable =>
+ SMono(eventBus.register(
+ StateChangeListener(pushEnable.dataTypes, clientContext.outbound),
+ AccountIdRegistrationKey.of(clientContext.session.getUser)))
+ .map((registration: Registration) => ClientContext(clientContext.outbound, Some(registration), clientContext.session))
+ })
private def handleHttpHandshakeError(throwable: Throwable, response: HttpServerResponse): SMono[Void] =
respondDetails(response, ProblemDetails.forThrowable(throwable))
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org