You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/02/03 02:51:10 UTC
[james-project] 04/12: JAMES-3491 Implement WebSocket routes
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 57acf284e6264b0cf3078f8cb2370118602bb8bf
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu Jan 28 11:41:35 2021 +0700
JAMES-3491 Implement WebSocket routes
---
.../james/jmap/rfc8621/RFC8621MethodsModule.java | 9 +-
.../james/jmap/json/ResponseSerializer.scala | 11 +-
.../apache/james/jmap/routes/WebSocketRoutes.scala | 145 +++++++++++++++++++++
.../main/java/org/apache/james/jmap/JMAPUrls.java | 1 +
4 files changed, 158 insertions(+), 8 deletions(-)
diff --git a/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java b/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java
index 4de781d..530c7ec 100644
--- a/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java
+++ b/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java
@@ -54,6 +54,7 @@ import org.apache.james.jmap.routes.DownloadRoutes;
import org.apache.james.jmap.routes.JMAPApiRoutes;
import org.apache.james.jmap.routes.SessionRoutes;
import org.apache.james.jmap.routes.UploadRoutes;
+import org.apache.james.jmap.routes.WebSocketRoutes;
import org.apache.james.metrics.api.MetricFactory;
import org.apache.james.utils.InitializationOperation;
import org.apache.james.utils.InitilizationOperationBuilder;
@@ -97,8 +98,12 @@ public class RFC8621MethodsModule extends AbstractModule {
}
@ProvidesIntoSet
- JMAPRoutesHandler routesHandler(SessionRoutes sessionRoutes, JMAPApiRoutes jmapApiRoutes, DownloadRoutes downloadRoutes, UploadRoutes uploadRoutes) {
- return new JMAPRoutesHandler(Version.RFC8621, jmapApiRoutes, sessionRoutes, downloadRoutes, uploadRoutes);
+ JMAPRoutesHandler routesHandler(SessionRoutes sessionRoutes,
+ JMAPApiRoutes jmapApiRoutes,
+ DownloadRoutes downloadRoutes,
+ UploadRoutes uploadRoutes,
+ WebSocketRoutes webSocketRoutes) {
+ return new JMAPRoutesHandler(Version.RFC8621, jmapApiRoutes, sessionRoutes, downloadRoutes, uploadRoutes, webSocketRoutes);
}
@Provides
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/ResponseSerializer.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/ResponseSerializer.scala
index edace41..02ca667 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/ResponseSerializer.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/ResponseSerializer.scala
@@ -202,6 +202,10 @@ object ResponseSerializer {
"requestId" -> error.requestId.map(_.value).map(JsString).getOrElse(JsNull))
++ errorJson.value)
}
+ private implicit val webSocketOutboundWrites: Writes[WebSocketOutboundMessage] = {
+ case response: WebSocketResponse => webSocketResponseWrites.writes(response)
+ case error: WebSocketError => webSocketErrorWrites.writes(error)
+ }
def serialize(session: Session): JsValue = Json.toJson(session)
@@ -213,12 +217,7 @@ object ResponseSerializer {
def serialize(errors: JsError): JsValue = Json.toJson(errors)
- def serialize(response: WebSocketOutboundMessage): JsValue = {
- case response: WebSocketResponse => Json.toJson(response)
- case error: WebSocketError => Json.toJson(error)
- }
-
- def serialize(errors: WebSocketError): JsValue = Json.toJson(errors)
+ def serialize(outboundMessage: WebSocketOutboundMessage): JsValue = Json.toJson(outboundMessage)
def deserializeRequestObject(input: String): JsResult[RequestObject] = Json.parse(input).validate[RequestObject]
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
new file mode 100644
index 0000000..c4b2900
--- /dev/null
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
@@ -0,0 +1,145 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.jmap.routes
+
+import com.fasterxml.jackson.core.JsonParseException
+import io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE
+import io.netty.handler.codec.http.HttpResponseStatus.{BAD_REQUEST, INTERNAL_SERVER_ERROR, UNAUTHORIZED}
+import io.netty.handler.codec.http.websocketx.WebSocketFrame
+import io.netty.handler.codec.http.{HttpMethod, HttpResponseStatus}
+import org.apache.james.jmap.HttpConstants.JSON_CONTENT_TYPE
+import org.apache.james.jmap.JMAPUrls.JMAP_WS
+import org.apache.james.jmap.core.ProblemDetails.{notJSONProblem, notRequestProblem, unknownCapabilityProblem}
+import org.apache.james.jmap.core.{ProblemDetails, RequestId, WebSocketError, WebSocketOutboundMessage, WebSocketRequest, WebSocketResponse}
+import org.apache.james.jmap.exceptions.UnauthorizedException
+import org.apache.james.jmap.http.rfc8621.InjectionKeys
+import org.apache.james.jmap.http.{Authenticator, UserProvisioning}
+import org.apache.james.jmap.json.ResponseSerializer
+import org.apache.james.jmap.routes.WebSocketRoutes.LOGGER
+import org.apache.james.jmap.{Endpoint, JMAPRoute, JMAPRoutes}
+import org.apache.james.mailbox.MailboxSession
+import org.slf4j.{Logger, LoggerFactory}
+import reactor.core.publisher.Mono
+import reactor.core.scala.publisher.{SFlux, SMono}
+import reactor.core.scheduler.Schedulers
+import reactor.netty.http.server.{HttpServerRequest, HttpServerResponse}
+import reactor.netty.http.websocket.{WebsocketInbound, WebsocketOutbound}
+
+import java.nio.charset.StandardCharsets
+import java.util.stream
+import javax.inject.{Inject, Named}
+
+object WebSocketRoutes {
+ val LOGGER: Logger = LoggerFactory.getLogger(classOf[WebSocketRoutes])
+}
+
+class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticator: Authenticator,
+ userProvisioner: UserProvisioning,
+ jmapApi: JMAPApi) extends JMAPRoutes {
+
+ override def routes(): stream.Stream[JMAPRoute] = stream.Stream.of(
+ JMAPRoute.builder
+ .endpoint(new Endpoint(HttpMethod.GET, JMAP_WS))
+ .action(this.handleWebSockets)
+ .corsHeaders,
+ JMAPRoute.builder
+ .endpoint(new Endpoint(HttpMethod.OPTIONS, JMAP_WS))
+ .action(JMAPRoutes.CORS_CONTROL)
+ .corsHeaders())
+
+ private def handleWebSockets(httpServerRequest: HttpServerRequest, httpServerResponse: HttpServerResponse): Mono[Void] = {
+ SMono(authenticator.authenticate(httpServerRequest))
+ .flatMap((mailboxSession: MailboxSession) => userProvisioner.provisionUser(mailboxSession)
+ .`then`
+ .`then`(SMono(httpServerResponse.sendWebsocket((in, out) => handleWebSocketConnection(mailboxSession)(in, out)))))
+ .onErrorResume(throwable => handleHttpHandshakeError(throwable, httpServerResponse))
+ .subscribeOn(Schedulers.elastic)
+ .asJava()
+ .`then`()
+ }
+
+ private def handleWebSocketConnection(session: MailboxSession)(in: WebsocketInbound, out: WebsocketOutbound): Mono[Void] =
+ SFlux[WebSocketFrame](in.aggregateFrames()
+ .receiveFrames())
+ .map(frame => {
+ val bytes = new Array[Byte](frame.content().readableBytes)
+ frame.content().readBytes(bytes)
+ new String(bytes, StandardCharsets.UTF_8)
+ })
+ .flatMap(handleClientMessages(session))
+ .onErrorResume(e => SMono.just(asError(None)(e)))
+ .map(ResponseSerializer.serialize)
+ .map(_.toString)
+ .flatMap(response => out.sendString(SMono.just(response), StandardCharsets.UTF_8))
+ .onErrorResume(e => {
+ e.printStackTrace()
+ SMono.empty
+ })
+ .`then`()
+ .asJava()
+ .`then`()
+
+ private def handleClientMessages(session: MailboxSession)(message: String): SMono[WebSocketOutboundMessage] =
+ ResponseSerializer.deserializeWebSocketInboundMessage(message)
+ .fold(invalid => {
+ val error = asError(None)(new IllegalArgumentException(invalid.toString()))
+ SMono.just[WebSocketOutboundMessage](error)
+ }, {
+ case request: WebSocketRequest =>
+ jmapApi.process(request.requestObject, session)
+ .map[WebSocketOutboundMessage](WebSocketResponse(request.requestId, _))
+ .onErrorResume(e => SMono.just(asError(request.requestId)(e)))
+ .subscribeOn(Schedulers.elastic)
+ })
+
+ private def handleHttpHandshakeError(throwable: Throwable, response: HttpServerResponse): SMono[Void] = throwable match {
+ case e: UnauthorizedException =>
+ LOGGER.warn("Unauthorized", e)
+ respondDetails(response,
+ ProblemDetails(status = UNAUTHORIZED, detail = e.getMessage),
+ UNAUTHORIZED)
+ case e =>
+ LOGGER.error("Unexpected error upon WebSocket handshake request", e)
+ respondDetails(response,
+ ProblemDetails(status = INTERNAL_SERVER_ERROR, detail = e.getMessage),
+ INTERNAL_SERVER_ERROR)
+ }
+
+ private def asError(requestId: Option[RequestId])(throwable: Throwable): WebSocketError = throwable match {
+ case exception: IllegalArgumentException =>
+ WebSocketError(requestId, notRequestProblem(
+ s"The request was successfully parsed as JSON but did not match the type signature of the Request object: ${exception.getMessage}"))
+ case exception: JsonParseException =>
+ WebSocketError(requestId, notJSONProblem(
+ s"The content type of the request was not application/json or the request did not parse as I-JSON: ${exception.getMessage}"))
+ case exception: UnsupportedCapabilitiesException =>
+ WebSocketError(requestId, unknownCapabilityProblem(s"The request used unsupported capabilities: ${exception.capabilities}"))
+ case e =>
+ LOGGER.error("Unexpected error upon API request", e)
+ WebSocketError(requestId, ProblemDetails(status = INTERNAL_SERVER_ERROR, detail = e.getMessage))
+ }
+
+ private def respondDetails(httpServerResponse: HttpServerResponse, details: ProblemDetails, statusCode: HttpResponseStatus = BAD_REQUEST): SMono[Void] =
+ SMono.fromPublisher(httpServerResponse.status(statusCode)
+ .header(CONTENT_TYPE, JSON_CONTENT_TYPE)
+ .sendString(SMono.fromCallable(() => ResponseSerializer.serialize(details).toString),
+ StandardCharsets.UTF_8)
+ .`then`)
+}
diff --git a/server/protocols/jmap/src/main/java/org/apache/james/jmap/JMAPUrls.java b/server/protocols/jmap/src/main/java/org/apache/james/jmap/JMAPUrls.java
index f7256c7..6148383 100644
--- a/server/protocols/jmap/src/main/java/org/apache/james/jmap/JMAPUrls.java
+++ b/server/protocols/jmap/src/main/java/org/apache/james/jmap/JMAPUrls.java
@@ -21,6 +21,7 @@ package org.apache.james.jmap;
public interface JMAPUrls {
String JMAP = "/jmap";
+ String JMAP_WS = "/jmap/ws";
String AUTHENTICATION = "/authentication";
String DOWNLOAD = "/download";
String UPLOAD = "/upload";
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org