You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/02/03 02:51:10 UTC

[james-project] 04/12: JAMES-3491 Implement WebSocket routes

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 57acf284e6264b0cf3078f8cb2370118602bb8bf
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu Jan 28 11:41:35 2021 +0700

    JAMES-3491 Implement WebSocket routes
---
 .../james/jmap/rfc8621/RFC8621MethodsModule.java   |   9 +-
 .../james/jmap/json/ResponseSerializer.scala       |  11 +-
 .../apache/james/jmap/routes/WebSocketRoutes.scala | 145 +++++++++++++++++++++
 .../main/java/org/apache/james/jmap/JMAPUrls.java  |   1 +
 4 files changed, 158 insertions(+), 8 deletions(-)

diff --git a/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java b/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java
index 4de781d..530c7ec 100644
--- a/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java
+++ b/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java
@@ -54,6 +54,7 @@ import org.apache.james.jmap.routes.DownloadRoutes;
 import org.apache.james.jmap.routes.JMAPApiRoutes;
 import org.apache.james.jmap.routes.SessionRoutes;
 import org.apache.james.jmap.routes.UploadRoutes;
+import org.apache.james.jmap.routes.WebSocketRoutes;
 import org.apache.james.metrics.api.MetricFactory;
 import org.apache.james.utils.InitializationOperation;
 import org.apache.james.utils.InitilizationOperationBuilder;
@@ -97,8 +98,12 @@ public class RFC8621MethodsModule extends AbstractModule {
     }
 
     @ProvidesIntoSet
-    JMAPRoutesHandler routesHandler(SessionRoutes sessionRoutes, JMAPApiRoutes jmapApiRoutes, DownloadRoutes downloadRoutes, UploadRoutes uploadRoutes) {
-        return new JMAPRoutesHandler(Version.RFC8621, jmapApiRoutes, sessionRoutes, downloadRoutes, uploadRoutes);
+    JMAPRoutesHandler routesHandler(SessionRoutes sessionRoutes,
+                                    JMAPApiRoutes jmapApiRoutes,
+                                    DownloadRoutes downloadRoutes,
+                                    UploadRoutes uploadRoutes,
+                                    WebSocketRoutes webSocketRoutes) {
+        return new JMAPRoutesHandler(Version.RFC8621, jmapApiRoutes, sessionRoutes, downloadRoutes, uploadRoutes, webSocketRoutes);
     }
 
     @Provides
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/ResponseSerializer.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/ResponseSerializer.scala
index edace41..02ca667 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/ResponseSerializer.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/ResponseSerializer.scala
@@ -202,6 +202,10 @@ object ResponseSerializer {
       "requestId" -> error.requestId.map(_.value).map(JsString).getOrElse(JsNull))
       ++ errorJson.value)
   }
+  private implicit val webSocketOutboundWrites: Writes[WebSocketOutboundMessage] = {
+    case response: WebSocketResponse => webSocketResponseWrites.writes(response)
+    case error: WebSocketError => webSocketErrorWrites.writes(error)
+  }
 
   def serialize(session: Session): JsValue = Json.toJson(session)
 
@@ -213,12 +217,7 @@ object ResponseSerializer {
 
   def serialize(errors: JsError): JsValue = Json.toJson(errors)
 
-  def serialize(response: WebSocketOutboundMessage): JsValue = {
-    case response: WebSocketResponse => Json.toJson(response)
-    case error: WebSocketError => Json.toJson(error)
-  }
-
-  def serialize(errors: WebSocketError): JsValue = Json.toJson(errors)
+  def serialize(outboundMessage: WebSocketOutboundMessage): JsValue = Json.toJson(outboundMessage)
 
   def deserializeRequestObject(input: String): JsResult[RequestObject] = Json.parse(input).validate[RequestObject]
 
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
new file mode 100644
index 0000000..c4b2900
--- /dev/null
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
@@ -0,0 +1,145 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *  http://www.apache.org/licenses/LICENSE-2.0                  *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.jmap.routes
+
+import com.fasterxml.jackson.core.JsonParseException
+import io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE
+import io.netty.handler.codec.http.HttpResponseStatus.{BAD_REQUEST, INTERNAL_SERVER_ERROR, UNAUTHORIZED}
+import io.netty.handler.codec.http.websocketx.WebSocketFrame
+import io.netty.handler.codec.http.{HttpMethod, HttpResponseStatus}
+import org.apache.james.jmap.HttpConstants.JSON_CONTENT_TYPE
+import org.apache.james.jmap.JMAPUrls.JMAP_WS
+import org.apache.james.jmap.core.ProblemDetails.{notJSONProblem, notRequestProblem, unknownCapabilityProblem}
+import org.apache.james.jmap.core.{ProblemDetails, RequestId, WebSocketError, WebSocketOutboundMessage, WebSocketRequest, WebSocketResponse}
+import org.apache.james.jmap.exceptions.UnauthorizedException
+import org.apache.james.jmap.http.rfc8621.InjectionKeys
+import org.apache.james.jmap.http.{Authenticator, UserProvisioning}
+import org.apache.james.jmap.json.ResponseSerializer
+import org.apache.james.jmap.routes.WebSocketRoutes.LOGGER
+import org.apache.james.jmap.{Endpoint, JMAPRoute, JMAPRoutes}
+import org.apache.james.mailbox.MailboxSession
+import org.slf4j.{Logger, LoggerFactory}
+import reactor.core.publisher.Mono
+import reactor.core.scala.publisher.{SFlux, SMono}
+import reactor.core.scheduler.Schedulers
+import reactor.netty.http.server.{HttpServerRequest, HttpServerResponse}
+import reactor.netty.http.websocket.{WebsocketInbound, WebsocketOutbound}
+
+import java.nio.charset.StandardCharsets
+import java.util.stream
+import javax.inject.{Inject, Named}
+
+object WebSocketRoutes {
+  val LOGGER: Logger = LoggerFactory.getLogger(classOf[WebSocketRoutes])
+}
+
+class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticator: Authenticator,
+                                 userProvisioner: UserProvisioning,
+                                 jmapApi: JMAPApi) extends JMAPRoutes {
+
+  override def routes(): stream.Stream[JMAPRoute] = stream.Stream.of(
+    JMAPRoute.builder
+      .endpoint(new Endpoint(HttpMethod.GET, JMAP_WS))
+      .action(this.handleWebSockets)
+      .corsHeaders,
+    JMAPRoute.builder
+      .endpoint(new Endpoint(HttpMethod.OPTIONS, JMAP_WS))
+      .action(JMAPRoutes.CORS_CONTROL)
+      .corsHeaders())
+
+  private def handleWebSockets(httpServerRequest: HttpServerRequest, httpServerResponse: HttpServerResponse): Mono[Void] = {
+    SMono(authenticator.authenticate(httpServerRequest))
+      .flatMap((mailboxSession: MailboxSession) => userProvisioner.provisionUser(mailboxSession)
+        .`then`
+        .`then`(SMono(httpServerResponse.sendWebsocket((in, out) => handleWebSocketConnection(mailboxSession)(in, out)))))
+      .onErrorResume(throwable => handleHttpHandshakeError(throwable, httpServerResponse))
+      .subscribeOn(Schedulers.elastic)
+      .asJava()
+      .`then`()
+  }
+
+  private def handleWebSocketConnection(session: MailboxSession)(in: WebsocketInbound, out: WebsocketOutbound): Mono[Void] =
+    SFlux[WebSocketFrame](in.aggregateFrames()
+      .receiveFrames())
+      .map(frame => {
+        val bytes = new Array[Byte](frame.content().readableBytes)
+        frame.content().readBytes(bytes)
+        new String(bytes, StandardCharsets.UTF_8)
+      })
+      .flatMap(handleClientMessages(session))
+      .onErrorResume(e => SMono.just(asError(None)(e)))
+      .map(ResponseSerializer.serialize)
+      .map(_.toString)
+      .flatMap(response => out.sendString(SMono.just(response), StandardCharsets.UTF_8))
+      .onErrorResume(e => {
+        e.printStackTrace()
+        SMono.empty
+      })
+      .`then`()
+      .asJava()
+      .`then`()
+
+  private def handleClientMessages(session: MailboxSession)(message: String): SMono[WebSocketOutboundMessage] =
+    ResponseSerializer.deserializeWebSocketInboundMessage(message)
+      .fold(invalid => {
+        val error = asError(None)(new IllegalArgumentException(invalid.toString()))
+        SMono.just[WebSocketOutboundMessage](error)
+      }, {
+          case request: WebSocketRequest =>
+            jmapApi.process(request.requestObject, session)
+              .map[WebSocketOutboundMessage](WebSocketResponse(request.requestId, _))
+              .onErrorResume(e => SMono.just(asError(request.requestId)(e)))
+              .subscribeOn(Schedulers.elastic)
+        })
+
+  private def handleHttpHandshakeError(throwable: Throwable, response: HttpServerResponse): SMono[Void] = throwable match {
+    case e: UnauthorizedException =>
+      LOGGER.warn("Unauthorized", e)
+      respondDetails(response,
+        ProblemDetails(status = UNAUTHORIZED, detail = e.getMessage),
+        UNAUTHORIZED)
+    case e =>
+      LOGGER.error("Unexpected error upon WebSocket handshake request", e)
+      respondDetails(response,
+        ProblemDetails(status = INTERNAL_SERVER_ERROR, detail = e.getMessage),
+        INTERNAL_SERVER_ERROR)
+  }
+
+  private def asError(requestId: Option[RequestId])(throwable: Throwable): WebSocketError = throwable match {
+    case exception: IllegalArgumentException =>
+      WebSocketError(requestId, notRequestProblem(
+        s"The request was successfully parsed as JSON but did not match the type signature of the Request object: ${exception.getMessage}"))
+    case exception: JsonParseException =>
+      WebSocketError(requestId, notJSONProblem(
+        s"The content type of the request was not application/json or the request did not parse as I-JSON: ${exception.getMessage}"))
+    case exception: UnsupportedCapabilitiesException =>
+      WebSocketError(requestId, unknownCapabilityProblem(s"The request used unsupported capabilities: ${exception.capabilities}"))
+    case e =>
+      LOGGER.error("Unexpected error upon API request", e)
+      WebSocketError(requestId, ProblemDetails(status = INTERNAL_SERVER_ERROR, detail = e.getMessage))
+  }
+
+  private def respondDetails(httpServerResponse: HttpServerResponse, details: ProblemDetails, statusCode: HttpResponseStatus = BAD_REQUEST): SMono[Void] =
+    SMono.fromPublisher(httpServerResponse.status(statusCode)
+      .header(CONTENT_TYPE, JSON_CONTENT_TYPE)
+      .sendString(SMono.fromCallable(() => ResponseSerializer.serialize(details).toString),
+        StandardCharsets.UTF_8)
+      .`then`)
+}
diff --git a/server/protocols/jmap/src/main/java/org/apache/james/jmap/JMAPUrls.java b/server/protocols/jmap/src/main/java/org/apache/james/jmap/JMAPUrls.java
index f7256c7..6148383 100644
--- a/server/protocols/jmap/src/main/java/org/apache/james/jmap/JMAPUrls.java
+++ b/server/protocols/jmap/src/main/java/org/apache/james/jmap/JMAPUrls.java
@@ -21,6 +21,7 @@ package org.apache.james.jmap;
 
 public interface JMAPUrls {
     String JMAP = "/jmap";
+    String JMAP_WS = "/jmap/ws";
     String AUTHENTICATION = "/authentication";
     String DOWNLOAD = "/download";
     String UPLOAD = "/upload";


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org