You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/02/15 02:05:46 UTC

[james-project] branch master updated: JAMES-3457 Implement JMAP eventSource

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new daa817d  JAMES-3457 Implement JMAP eventSource
daa817d is described below

commit daa817d6ceb52f0f12abfc7fe1cd6ee532ccf62a
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sat Feb 6 08:25:05 2021 +0700

    JAMES-3457 Implement JMAP eventSource
    
    This reuses the work introduced around WebSocket PUSH
    and wraps it around a Server Sent Event transport
    layer (over HTTP)
    
    JMAP Spec: https://jmap.io/spec-core.html#event-source
    
    Here is a little read that helped me understand SSE
    structure:
      https://www.html5rocks.com/en/tutorials/eventsource/basics/
    
    As reactor-netty supports long running HTTP requests but has
    no support for SSE by itself, a basic understanding of the
    eventStream structure is required.
    
    The current implementation proposal suffer from the following
    limitations:
    
     - It does not perform any sanitizing on the ping interval (minimum 1 second) in a perfect world this should likely be configurable as:
        - too low values are an ineffective use of the network.
        - too high values might exceed some deployments
          connection timeout settings thus defeat the ping
          purpose.
     - It does not offer server-state quick-resynchronization
       mechanism upon disconnects (akka handling
       `Last-Event-ID`). Similar work is ongoing for
       the WebSocket transport (`pushState`) thus
       `Last-Event-ID` will likely be implemented after.
---
 .../james/jmap/rfc8621/RFC8621MethodsModule.java   |   6 +-
 .../jmap-rfc-8621-integration-tests-common/pom.xml |   4 +
 .../rfc8621/contract/CustomMethodContract.scala    |   2 +-
 .../rfc8621/contract/EventSourceContract.scala     | 386 +++++++++++++++++++++
 .../rfc8621/contract/SessionRoutesContract.scala   |   2 +-
 .../rfc8621/memory/MemoryEventSourceTest.java}     |  25 +-
 .../jmap-rfc-8621/doc/specs/spec/jmap/push.mdown   |  21 +-
 .../org/apache/james/jmap/change/StateChange.scala |  11 +
 .../james/jmap/change/StateChangeListener.scala    |   4 +-
 .../james/jmap/core/JmapRfc8621Configuration.scala |   2 +-
 .../james/jmap/core/WebSocketTransport.scala       |  11 +-
 .../james/jmap/json/ResponseSerializer.scala       |  17 +-
 .../james/jmap/routes/EventSourceRoutes.scala      | 227 ++++++++++++
 .../apache/james/jmap/routes/WebSocketRoutes.scala |   9 +-
 .../jmap/change/StateChangeListenerTest.scala      |   8 +-
 .../jmap/core/JmapRfc8621ConfigurationTest.scala   |   4 +-
 .../james/jmap/routes/SessionRoutesTest.scala      |   2 +-
 .../main/java/org/apache/james/jmap/JMAPUrls.java  |   1 +
 .../org/apache/james/jmap/http/Authenticator.java  |   4 -
 19 files changed, 700 insertions(+), 46 deletions(-)

diff --git a/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java b/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java
index 530c7ec..2da45af 100644
--- a/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java
+++ b/server/container/guice/protocols/jmap/src/main/java/org/apache/james/jmap/rfc8621/RFC8621MethodsModule.java
@@ -51,6 +51,7 @@ import org.apache.james.jmap.method.VacationResponseGetMethod;
 import org.apache.james.jmap.method.VacationResponseSetMethod;
 import org.apache.james.jmap.method.ZoneIdProvider;
 import org.apache.james.jmap.routes.DownloadRoutes;
+import org.apache.james.jmap.routes.EventSourceRoutes;
 import org.apache.james.jmap.routes.JMAPApiRoutes;
 import org.apache.james.jmap.routes.SessionRoutes;
 import org.apache.james.jmap.routes.UploadRoutes;
@@ -102,8 +103,9 @@ public class RFC8621MethodsModule extends AbstractModule {
                                     JMAPApiRoutes jmapApiRoutes,
                                     DownloadRoutes downloadRoutes,
                                     UploadRoutes uploadRoutes,
-                                    WebSocketRoutes webSocketRoutes) {
-        return new JMAPRoutesHandler(Version.RFC8621, jmapApiRoutes, sessionRoutes, downloadRoutes, uploadRoutes, webSocketRoutes);
+                                    WebSocketRoutes webSocketRoutes,
+                                    EventSourceRoutes eventSourceRoutes) {
+        return new JMAPRoutesHandler(Version.RFC8621, jmapApiRoutes, sessionRoutes, downloadRoutes, uploadRoutes, webSocketRoutes, eventSourceRoutes);
     }
 
     @Provides
diff --git a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/pom.xml b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/pom.xml
index 0fabc14..8f11f0b 100644
--- a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/pom.xml
+++ b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/pom.xml
@@ -29,6 +29,10 @@
     <name>Apache James :: Server :: JMAP RFC-8621 :: Contract for Integration Testing</name>
     <description>JMAP RFC-8621 integration test suite common to all products</description>
 
+    <properties>
+        <cxf-version>3.4.2</cxf-version>
+    </properties>
+
     <dependencies>
         <dependency>
             <groupId>${james.groupId}</groupId>
diff --git a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/CustomMethodContract.scala b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/CustomMethodContract.scala
index 0f7607c..f2f367d 100644
--- a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/CustomMethodContract.scala
+++ b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/CustomMethodContract.scala
@@ -132,7 +132,7 @@ object CustomMethodContract {
       |  "apiUrl" : "http://domain.com/jmap",
       |  "downloadUrl" : "http://domain.com/download/{accountId}/{blobId}/?type={type}&name={name}",
       |  "uploadUrl" : "http://domain.com/upload/{accountId}",
-      |  "eventSourceUrl" : "http://domain.com/eventSource",
+      |  "eventSourceUrl" : "http://domain.com/eventSource?types={types}&closeAfter={closeAfter}&ping={ping}",
       |  "state" : "2c9f1b12-b35a-43e6-9af2-0106fb53a943"
       |}""".stripMargin
 }
diff --git a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/EventSourceContract.scala b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/EventSourceContract.scala
new file mode 100644
index 0000000..032afca
--- /dev/null
+++ b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/EventSourceContract.scala
@@ -0,0 +1,386 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.jmap.rfc8621.contract
+
+import java.nio.charset.StandardCharsets
+
+import io.netty.handler.codec.http.HttpResponseStatus
+import org.apache.james.GuiceJamesServer
+import org.apache.james.jmap.draft.JmapGuiceProbe
+import org.apache.james.jmap.rfc8621.contract.Fixture.{ACCEPT_RFC8621_VERSION_HEADER, ANDRE, ANDRE_PASSWORD, BOB, BOB_PASSWORD, DOMAIN}
+import org.apache.james.mailbox.model.MailboxPath
+import org.apache.james.modules.MailboxProbeImpl
+import org.apache.james.utils.DataProbeImpl
+import org.assertj.core.api.Assertions.assertThat
+import org.junit.jupiter.api.{BeforeEach, Test}
+import reactor.core.scheduler.Schedulers
+import reactor.netty.http.client.HttpClient
+
+import scala.collection.mutable.ListBuffer
+import scala.jdk.CollectionConverters._
+
+trait EventSourceContract {
+  @BeforeEach
+  def setUp(server: GuiceJamesServer): Unit = {
+    server.getProbe(classOf[DataProbeImpl])
+      .fluent()
+      .addDomain(DOMAIN.asString())
+      .addUser(ANDRE.asString(), ANDRE_PASSWORD)
+      .addUser(BOB.asString(), BOB_PASSWORD)
+  }
+
+  @Test
+  def typesQueryParamIsCompulsory(server: GuiceJamesServer): Unit = {
+    val port = server.getProbe(classOf[JmapGuiceProbe]).getJmapPort.getValue
+
+    val status = HttpClient.create
+      .baseUrl(s"http://127.0.0.1:$port/eventSource?ping=0&closeAfter=no")
+      .headers(builder => {
+        builder.add("Authorization", "Basic Ym9iQGRvbWFpbi50bGQ6Ym9icGFzc3dvcmQ=")
+        builder.add("Accept", ACCEPT_RFC8621_VERSION_HEADER)
+      })
+      .get()
+      .response().block().status()
+
+    assertThat(status.code()).isEqualTo(HttpResponseStatus.BAD_REQUEST.code())
+  }
+
+  @Test
+  def pingQueryParamIsCompulsory(server: GuiceJamesServer): Unit = {
+    val port = server.getProbe(classOf[JmapGuiceProbe]).getJmapPort.getValue
+
+    val status = HttpClient.create
+      .baseUrl(s"http://127.0.0.1:$port/eventSource?types=*&closeAfter=no")
+      .headers(builder => {
+        builder.add("Authorization", "Basic Ym9iQGRvbWFpbi50bGQ6Ym9icGFzc3dvcmQ=")
+        builder.add("Accept", ACCEPT_RFC8621_VERSION_HEADER)
+      })
+      .get()
+      .response()
+      .block()
+      .status()
+
+    assertThat(status.code()).isEqualTo(HttpResponseStatus.BAD_REQUEST.code())
+  }
+
+  @Test
+  def closeAfterQueryParamIsCompulsory(server: GuiceJamesServer): Unit = {
+    val port = server.getProbe(classOf[JmapGuiceProbe]).getJmapPort.getValue
+
+    val status = HttpClient.create
+      .baseUrl(s"http://127.0.0.1:$port/eventSource?types=*&ping=0")
+      .headers(builder => {
+        builder.add("Authorization", "Basic Ym9iQGRvbWFpbi50bGQ6Ym9icGFzc3dvcmQ=")
+        builder.add("Accept", ACCEPT_RFC8621_VERSION_HEADER)
+      })
+      .get()
+      .response()
+      .block()
+      .status()
+
+    assertThat(status.code()).isEqualTo(HttpResponseStatus.BAD_REQUEST.code())
+  }
+
+  @Test
+  def shouldRejectInvalidCloseAfter(server: GuiceJamesServer): Unit = {
+    val port = server.getProbe(classOf[JmapGuiceProbe]).getJmapPort.getValue
+
+    val status = HttpClient.create
+      .baseUrl(s"http://127.0.0.1:$port/eventSource?types=*&ping=0&closeAfter=bad")
+      .headers(builder => {
+        builder.add("Authorization", "Basic Ym9iQGRvbWFpbi50bGQ6Ym9icGFzc3dvcmQ=")
+        builder.add("Accept", ACCEPT_RFC8621_VERSION_HEADER)
+      })
+      .get()
+      .response()
+      .block()
+      .status()
+
+    assertThat(status.code()).isEqualTo(HttpResponseStatus.BAD_REQUEST.code())
+  }
+
+  @Test
+  def shouldRejectInvalidPing(server: GuiceJamesServer): Unit = {
+    val port = server.getProbe(classOf[JmapGuiceProbe]).getJmapPort.getValue
+
+    val status = HttpClient.create
+      .baseUrl(s"http://127.0.0.1:$port/eventSource?types=*&ping=bad&closeAfter=no")
+      .headers(builder => {
+        builder.add("Authorization", "Basic Ym9iQGRvbWFpbi50bGQ6Ym9icGFzc3dvcmQ=")
+        builder.add("Accept", ACCEPT_RFC8621_VERSION_HEADER)
+      })
+      .get()
+      .response()
+      .block()
+      .status()
+
+    assertThat(status.code()).isEqualTo(HttpResponseStatus.BAD_REQUEST.code())
+  }
+
+  @Test
+  def shouldRejectInvalidTypes(server: GuiceJamesServer): Unit = {
+    val port = server.getProbe(classOf[JmapGuiceProbe]).getJmapPort.getValue
+
+    val status = HttpClient.create
+      .baseUrl(s"http://127.0.0.1:$port/eventSource?types=bad&ping=0&closeAfter=no")
+      .headers(builder => {
+        builder.add("Authorization", "Basic Ym9iQGRvbWFpbi50bGQ6Ym9icGFzc3dvcmQ=")
+        builder.add("Accept", ACCEPT_RFC8621_VERSION_HEADER)
+      })
+      .get()
+      .response()
+      .block()
+      .status()
+
+    assertThat(status.code()).isEqualTo(HttpResponseStatus.BAD_REQUEST.code())
+  }
+
+  @Test
+  def shouldRejectUnauthenticated(server: GuiceJamesServer): Unit = {
+    val port = server.getProbe(classOf[JmapGuiceProbe]).getJmapPort.getValue
+
+    val status = HttpClient.create
+      .baseUrl(s"http://127.0.0.1:$port/eventSource?types=Email&ping=0&closeAfter=no")
+      .headers(builder => {
+        builder.add("Accept", ACCEPT_RFC8621_VERSION_HEADER)
+      })
+      .get()
+      .response()
+      .block()
+      .status()
+
+    assertThat(status.code()).isEqualTo(HttpResponseStatus.UNAUTHORIZED.code())
+  }
+
+  @Test
+  def noSSEEventShouldBeSentByDefault(server: GuiceJamesServer): Unit = {
+    val port = server.getProbe(classOf[JmapGuiceProbe]).getJmapPort.getValue
+
+    val seq = new ListBuffer[String]()
+    HttpClient.create
+      .baseUrl(s"http://127.0.0.1:$port/eventSource?types=*&ping=0&closeAfter=no")
+      .headers(builder => {
+        builder.add("Authorization", "Basic Ym9iQGRvbWFpbi50bGQ6Ym9icGFzc3dvcmQ=")
+        builder.add("Accept", ACCEPT_RFC8621_VERSION_HEADER)
+      })
+      .get()
+      .responseContent()
+      .map(bb => {
+        val bytes = new Array[Byte](bb.readableBytes)
+        bb.readBytes(bytes)
+        new String(bytes, StandardCharsets.UTF_8)
+      })
+      .doOnNext(seq.addOne)
+      .subscribeOn(Schedulers.elastic())
+      .subscribe()
+
+    Thread.sleep(500)
+
+    assertThat(seq.asJava).isEmpty()
+  }
+
+  @Test
+  def sseEventsShouldBeFilteredByTypes(server: GuiceJamesServer): Unit = {
+    val port = server.getProbe(classOf[JmapGuiceProbe]).getJmapPort.getValue
+
+    val seq = new ListBuffer[String]()
+    HttpClient.create
+      .baseUrl(s"http://127.0.0.1:$port/eventSource?types=Email&ping=0&closeAfter=no")
+      .headers(builder => {
+        builder.add("Authorization", "Basic Ym9iQGRvbWFpbi50bGQ6Ym9icGFzc3dvcmQ=")
+        builder.add("Accept", ACCEPT_RFC8621_VERSION_HEADER)
+      })
+      .get()
+      .responseContent()
+      .map(bb => {
+        val bytes = new Array[Byte](bb.readableBytes)
+        bb.readBytes(bytes)
+        new String(bytes, StandardCharsets.UTF_8)
+      })
+      .doOnNext(seq.addOne)
+      .subscribeOn(Schedulers.elastic())
+      .subscribe()
+
+    Thread.sleep(500)
+    server.getProbe(classOf[MailboxProbeImpl]).createMailbox(MailboxPath.inbox(BOB))
+    Thread.sleep(200)
+
+    assertThat(seq.asJava).isEmpty()
+  }
+
+  @Test
+  def allTypesShouldBeSupported(server: GuiceJamesServer): Unit = {
+    val port = server.getProbe(classOf[JmapGuiceProbe]).getJmapPort.getValue
+
+    val seq = new ListBuffer[String]()
+    HttpClient.create
+      .baseUrl(s"http://127.0.0.1:$port/eventSource?types=Mailbox,Email,VacationResponse,Thread,Identity,EmailSubmission,EmailDelivery&ping=0&closeAfter=no")
+      .headers(builder => {
+        builder.add("Authorization", "Basic Ym9iQGRvbWFpbi50bGQ6Ym9icGFzc3dvcmQ=")
+        builder.add("Accept", ACCEPT_RFC8621_VERSION_HEADER)
+      })
+      .get()
+      .responseContent()
+      .map(bb => {
+        val bytes = new Array[Byte](bb.readableBytes)
+        bb.readBytes(bytes)
+        new String(bytes, StandardCharsets.UTF_8)
+      })
+      .doOnNext(seq.addOne)
+      .subscribeOn(Schedulers.elastic())
+      .subscribe()
+
+    Thread.sleep(500)
+    server.getProbe(classOf[MailboxProbeImpl]).createMailbox(MailboxPath.inbox(BOB))
+    Thread.sleep(200)
+
+    assertThat(seq.asJava)
+      .hasSize(1)
+    assertThat(seq.head)
+      .startsWith("event: state\ndata: {\"@type\":\"StateChange\",\"changed\":{\"29883977c13473ae7cb7678ef767cbfbaffc8a44a6e463d971d23a65c1dc4af6\":{\"Mailbox\":")
+    assertThat(seq.head).endsWith("\n\n")
+  }
+
+  @Test
+  def pingShouldBeSupported(server: GuiceJamesServer): Unit = {
+    val port = server.getProbe(classOf[JmapGuiceProbe]).getJmapPort.getValue
+
+    val seq = new ListBuffer[String]()
+    HttpClient.create
+      .baseUrl(s"http://127.0.0.1:$port/eventSource?types=*&ping=1&closeAfter=no")
+      .headers(builder => {
+        builder.add("Authorization", "Basic Ym9iQGRvbWFpbi50bGQ6Ym9icGFzc3dvcmQ=")
+        builder.add("Accept", ACCEPT_RFC8621_VERSION_HEADER)
+      })
+      .get()
+      .responseContent()
+      .map(bb => {
+        val bytes = new Array[Byte](bb.readableBytes)
+        bb.readBytes(bytes)
+        new String(bytes, StandardCharsets.UTF_8)
+      })
+      .doOnNext(seq.addOne)
+      .subscribeOn(Schedulers.elastic())
+      .subscribe()
+
+    Thread.sleep(2000)
+
+    assertThat(seq.size).isGreaterThanOrEqualTo(1)
+    assertThat(seq.head).isEqualTo("event: ping\ndata: {\"interval\":1}\n\n")
+  }
+
+  @Test
+  def sseShouldTransportEvent(server: GuiceJamesServer): Unit = {
+    val port = server.getProbe(classOf[JmapGuiceProbe]).getJmapPort.getValue
+
+    val seq = new ListBuffer[String]()
+    HttpClient.create
+      .baseUrl(s"http://127.0.0.1:$port/eventSource?types=*&ping=0&closeAfter=no")
+      .headers(builder => {
+        builder.add("Authorization", "Basic Ym9iQGRvbWFpbi50bGQ6Ym9icGFzc3dvcmQ=")
+        builder.add("Accept", ACCEPT_RFC8621_VERSION_HEADER)
+      })
+      .get()
+      .responseContent()
+      .map(buffer => {
+        val bytes = new Array[Byte](buffer.readableBytes)
+        buffer.readBytes(bytes)
+        new String(bytes, StandardCharsets.UTF_8)
+      })
+      .doOnNext(seq.addOne)
+      .subscribeOn(Schedulers.elastic())
+      .subscribe()
+
+    Thread.sleep(500)
+    server.getProbe(classOf[MailboxProbeImpl]).createMailbox(MailboxPath.inbox(BOB))
+    Thread.sleep(200)
+
+    assertThat(seq.asJava)
+      .hasSize(1)
+    assertThat(seq.head)
+      .startsWith("event: state\ndata: {\"@type\":\"StateChange\",\"changed\":{\"29883977c13473ae7cb7678ef767cbfbaffc8a44a6e463d971d23a65c1dc4af6\":{\"Mailbox\":")
+    assertThat(seq.head).endsWith("\n\n")
+  }
+
+  @Test
+  def sseShouldCloseAfterEventWhenCloseAfterState(server: GuiceJamesServer): Unit = {
+    val port = server.getProbe(classOf[JmapGuiceProbe]).getJmapPort.getValue
+
+    val seq = new ListBuffer[String]()
+    HttpClient.create
+      .baseUrl(s"http://127.0.0.1:$port/eventSource?types=*&ping=0&closeAfter=state")
+      .headers(builder => {
+        builder.add("Authorization", "Basic Ym9iQGRvbWFpbi50bGQ6Ym9icGFzc3dvcmQ=")
+        builder.add("Accept", ACCEPT_RFC8621_VERSION_HEADER)
+      })
+      .get()
+      .responseContent()
+      .map(buffer => {
+        val bytes = new Array[Byte](buffer.readableBytes)
+        buffer.readBytes(bytes)
+        new String(bytes, StandardCharsets.UTF_8)
+      })
+      .doOnNext(seq.addOne)
+      .subscribeOn(Schedulers.elastic())
+      .subscribe()
+
+    Thread.sleep(500)
+    server.getProbe(classOf[MailboxProbeImpl]).createMailbox(MailboxPath.inbox(BOB))
+    server.getProbe(classOf[MailboxProbeImpl]).createMailbox(MailboxPath.forUser(BOB, "other"))
+    Thread.sleep(200)
+
+    assertThat(seq.asJava)
+      .hasSize(1)
+    assertThat(seq.head)
+      .startsWith("event: state\ndata: {\"@type\":\"StateChange\",\"changed\":{\"29883977c13473ae7cb7678ef767cbfbaffc8a44a6e463d971d23a65c1dc4af6\":{\"Mailbox\":")
+    assertThat(seq.head).endsWith("\n\n")
+  }
+
+  @Test
+  def sseShouldTransportEvents(server: GuiceJamesServer): Unit = {
+    val port = server.getProbe(classOf[JmapGuiceProbe]).getJmapPort.getValue
+
+    val seq = new ListBuffer[String]()
+    HttpClient.create
+      .baseUrl(s"http://127.0.0.1:$port/eventSource?types=*&ping=0&closeAfter=no")
+      .headers(builder => {
+        builder.add("Authorization", "Basic Ym9iQGRvbWFpbi50bGQ6Ym9icGFzc3dvcmQ=")
+        builder.add("Accept", ACCEPT_RFC8621_VERSION_HEADER)
+      })
+      .get()
+      .responseContent()
+      .map(buffer => {
+        val bytes = new Array[Byte](buffer.readableBytes)
+        buffer.readBytes(bytes)
+        new String(bytes, StandardCharsets.UTF_8)
+      })
+      .doOnNext(seq.addOne)
+      .subscribeOn(Schedulers.elastic())
+      .subscribe()
+
+    Thread.sleep(500)
+    server.getProbe(classOf[MailboxProbeImpl]).createMailbox(MailboxPath.inbox(BOB))
+    server.getProbe(classOf[MailboxProbeImpl]).createMailbox(MailboxPath.forUser(BOB, "other"))
+    Thread.sleep(200)
+
+    assertThat(seq.asJava)
+      .hasSize(2)
+  }
+}
diff --git a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/SessionRoutesContract.scala b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/SessionRoutesContract.scala
index a7ffe3d..7491ebf 100644
--- a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/SessionRoutesContract.scala
+++ b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/SessionRoutesContract.scala
@@ -122,7 +122,7 @@ object SessionRoutesContract {
                          |  "apiUrl" : "http://domain.com/jmap",
                          |  "downloadUrl" : "http://domain.com/download/{accountId}/{blobId}/?type={type}&name={name}",
                          |  "uploadUrl" : "http://domain.com/upload/{accountId}",
-                         |  "eventSourceUrl" : "http://domain.com/eventSource",
+                         |  "eventSourceUrl" : "http://domain.com/eventSource?types={types}&closeAfter={closeAfter}&ping={ping}",
                          |  "state" : "2c9f1b12-b35a-43e6-9af2-0106fb53a943"
                          |}""".stripMargin
   private val EXPECTED_BASE_PATH: String = "/jmap"
diff --git a/server/protocols/jmap/src/main/java/org/apache/james/jmap/JMAPUrls.java b/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryEventSourceTest.java
similarity index 57%
copy from server/protocols/jmap/src/main/java/org/apache/james/jmap/JMAPUrls.java
copy to server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryEventSourceTest.java
index 6148383..6c81a2e 100644
--- a/server/protocols/jmap/src/main/java/org/apache/james/jmap/JMAPUrls.java
+++ b/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryEventSourceTest.java
@@ -17,13 +17,22 @@
  * under the License.                                           *
  ****************************************************************/
 
-package org.apache.james.jmap;
+package org.apache.james.jmap.rfc8621.memory;
 
-public interface JMAPUrls {
-    String JMAP = "/jmap";
-    String JMAP_WS = "/jmap/ws";
-    String AUTHENTICATION = "/authentication";
-    String DOWNLOAD = "/download";
-    String UPLOAD = "/upload";
-    String NOT_IMPLEMENTED = "/notImplemented";
+import static org.apache.james.MemoryJamesServerMain.IN_MEMORY_SERVER_AGGREGATE_MODULE;
+
+import org.apache.james.GuiceJamesServer;
+import org.apache.james.JamesServerBuilder;
+import org.apache.james.JamesServerExtension;
+import org.apache.james.jmap.rfc8621.contract.EventSourceContract;
+import org.apache.james.modules.TestJMAPServerModule;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+public class MemoryEventSourceTest implements EventSourceContract {
+    @RegisterExtension
+    static JamesServerExtension testExtension = new JamesServerBuilder<>(JamesServerBuilder.defaultConfigurationProvider())
+        .server(configuration -> GuiceJamesServer.forConfiguration(configuration)
+            .combineWith(IN_MEMORY_SERVER_AGGREGATE_MODULE)
+            .overrideWith(new TestJMAPServerModule()))
+        .build();
 }
diff --git a/server/protocols/jmap-rfc-8621/doc/specs/spec/jmap/push.mdown b/server/protocols/jmap-rfc-8621/doc/specs/spec/jmap/push.mdown
index 78b0e3d..dff6ac8 100644
--- a/server/protocols/jmap-rfc-8621/doc/specs/spec/jmap/push.mdown
+++ b/server/protocols/jmap-rfc-8621/doc/specs/spec/jmap/push.mdown
@@ -1,7 +1,7 @@
 # Push
 
 > :warning:
-> Not implemented yet.
+> Partially implemented.
 
 
 Push notifications allow clients to efficiently update (almost) instantly to stay in sync with data changes on the server. The general model for push is simple and sends minimal data over the push channel: just enough for the client to know whether it needs to resync. The format allows multiple changes to be coalesced into a single push update and the frequency of pushes to be rate limited by the server. It doesn't matter if some push events are dropped before they reach the client; the  [...]
@@ -46,6 +46,9 @@ If the client is itself making changes, it may receive a StateChange object whil
 
 ## PushSubscription
 
+> :warning:
+> Not implemented.
+
 Clients may create a *PushSubscription* to register a URL with the JMAP server. The JMAP server will then make an HTTP POST request to this URL for each push notification it wishes to send to the client.
 
 As a push subscription causes the JMAP server to make a number of requests to a previously unknown endpoint, it can be used as a vector for launching a denial-of-service attack. To prevent this, when a subscription is created, the JMAP server immediately sends a PushVerification object to that URL (see Section 7.2.2). The JMAP server MUST NOT make any further requests to the URL until the client receives the push and updates the subscription with the correct verification code.
@@ -223,12 +226,21 @@ The server extends the expiry time, but only again to its maximum limit of 7 day
 
 ## Event Source
 
+
+> :information_source:
+> Implemented.
+
 Clients that can hold transport connections open can connect directly to the JMAP server to receive push notifications via a `text/event-stream` resource, as described in [EventSource](https://www.w3.org/TR/eventsource/). This is a long running HTTP request, where the server can push data to the client by appending data without ending the response.
 
 When a change occurs in the data on the server, it pushes an event called `state` to any connected clients, with the *StateChange* object as the data.
 
 The server SHOULD also send a new event id that encodes the entire server state visible to the user immediately after sending a *state* event. When a new connection is made to the event-source endpoint, a client following the server-sent events specification will send a Last-Event-ID HTTP header field with the last id it saw, which the server can use to work out whether the client has missed some changes. If so, it SHOULD send these changes immediately on connection.
 
+> :warning:
+>
+> James does not offer server-state quick-resynchronization
+> mechanism upon disconnects (akka handling `Last-Event-ID`).
+
 The Session object (see Section 2) has an *eventSourceUrl* property, which is in URI Template (level 1) format [@!RFC6570]. The URL MUST contain variables called `types`, `closeafter`, and `ping`.
 
 To connect to the resource, the client makes an authenticated GET request to the event-source URL with the appropriate variables substituted in:
@@ -247,6 +259,13 @@ To connect to the resource, the client makes an authenticated GET request to the
 
      The server MAY modify a requested ping interval to be subject to a minimum and/or maximum value. For interoperability, servers MUST NOT have a minimum allowed value higher than 30 or a maximum allowed value less than 300.
 
+> :information_source:
+> James does not perform any sanitizing on the ping interval (minimum 1 second) in a perfect world this should likely be configurable as:
+>      - too low values are an ineffective use of the network.
+>      - too high values might exceed some deployments
+>        connection timeout settings thus defeat the ping
+>        purpose.
+
      The data for the ping event MUST be a JSON object containing an *interval* property, the value (type `UnsignedInt`) being the interval in seconds the server is using to send pings (this may be different to the requested value if the server clamped it to be within a min/max value).
 
      Clients can monitor for the ping event to help determine when the closeafter mode may be required.
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChange.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChange.scala
index 08c2811..25ea386 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChange.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChange.scala
@@ -26,6 +26,17 @@ import org.apache.james.jmap.core.{AccountId, State, StateChange}
 
 object TypeName {
   val ALL: Set[TypeName] = Set(EmailTypeName, MailboxTypeName, ThreadTypeName, IdentityTypeName, EmailSubmissionTypeName, EmailDeliveryTypeName)
+
+  def parse(string: String): Either[String, TypeName] = string match {
+    case MailboxTypeName.asString => Right(MailboxTypeName)
+    case EmailTypeName.asString => Right(EmailTypeName)
+    case ThreadTypeName.asString => Right(ThreadTypeName)
+    case IdentityTypeName.asString => Right(IdentityTypeName)
+    case EmailSubmissionTypeName.asString => Right(EmailSubmissionTypeName)
+    case EmailDeliveryTypeName.asString => Right(EmailDeliveryTypeName)
+    case VacationResponseTypeName.asString => Right(VacationResponseTypeName)
+    case _ => Left(s"Unknown typeName $string")
+  }
 }
 
 sealed trait TypeName {
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChangeListener.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChangeListener.scala
index c83f642..1dcbe82 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChangeListener.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/StateChangeListener.scala
@@ -21,13 +21,13 @@ package org.apache.james.jmap.change
 
 import org.apache.james.events.Event
 import org.apache.james.events.EventListener.ReactiveEventListener
-import org.apache.james.jmap.core.WebSocketOutboundMessage
+import org.apache.james.jmap.core.OutboundMessage
 import org.reactivestreams.Publisher
 import reactor.core.publisher.Sinks
 import reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST
 import reactor.core.scala.publisher.SMono
 
-case class StateChangeListener(types: Set[TypeName], sink: Sinks.Many[WebSocketOutboundMessage]) extends ReactiveEventListener {
+case class StateChangeListener(types: Set[TypeName], sink: Sinks.Many[OutboundMessage]) extends ReactiveEventListener {
   override def reactiveEvent(event: Event): Publisher[Void] =
     event match {
       case stateChangeEvent: StateChangeEvent =>
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/JmapRfc8621Configuration.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/JmapRfc8621Configuration.scala
index 43f8420..78f3b3e 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/JmapRfc8621Configuration.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/JmapRfc8621Configuration.scala
@@ -47,6 +47,6 @@ case class JmapRfc8621Configuration(urlPrefixString: String, maxUploadSize: MaxS
   val apiUrl: URL = new URL(s"$urlPrefixString/jmap")
   val downloadUrl: URL = new URL(urlPrefixString + "/download/{accountId}/{blobId}/?type={type}&name={name}")
   val uploadUrl: URL = new URL(s"$urlPrefixString/upload/{accountId}")
-  val eventSourceUrl: URL = new URL(s"$urlPrefixString/eventSource")
+  val eventSourceUrl: URL = new URL(s"$urlPrefixString/eventSource?types={types}&closeAfter={closeAfter}&ping={ping}")
   val webSocketUrl: URL = new URL(s"$urlPrefixString/jmap/ws")
 }
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/WebSocketTransport.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/WebSocketTransport.scala
index 4c25d4a..5597901 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/WebSocketTransport.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/core/WebSocketTransport.scala
@@ -20,20 +20,23 @@
 package org.apache.james.jmap.core
 
 import org.apache.james.jmap.change.{TypeName, TypeState}
+import org.apache.james.jmap.routes.PingPolicy.Interval
 
 sealed trait WebSocketInboundMessage
 
-sealed trait WebSocketOutboundMessage
+sealed trait OutboundMessage
+
+case class PingMessage(interval: Interval) extends OutboundMessage
 
 case class RequestId(value: String) extends AnyVal
 
 case class WebSocketRequest(requestId: Option[RequestId], requestObject: RequestObject) extends WebSocketInboundMessage
 
-case class WebSocketResponse(requestId: Option[RequestId], responseObject: ResponseObject) extends WebSocketOutboundMessage
+case class WebSocketResponse(requestId: Option[RequestId], responseObject: ResponseObject) extends OutboundMessage
 
-case class WebSocketError(requestId: Option[RequestId], problemDetails: ProblemDetails) extends WebSocketOutboundMessage
+case class WebSocketError(requestId: Option[RequestId], problemDetails: ProblemDetails) extends OutboundMessage
 
-case class StateChange(changes: Map[AccountId, TypeState]) extends WebSocketOutboundMessage {
+case class StateChange(changes: Map[AccountId, TypeState]) extends OutboundMessage {
 
   def filter(types: Set[TypeName]): Option[StateChange] =
     Option(changes.flatMap {
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/ResponseSerializer.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/ResponseSerializer.scala
index 46aa195..847acab 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/ResponseSerializer.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/json/ResponseSerializer.scala
@@ -25,7 +25,7 @@ import java.net.URL
 import eu.timepit.refined.refineV
 import io.netty.handler.codec.http.HttpResponseStatus
 import org.apache.james.core.Username
-import org.apache.james.jmap.change.{EmailDeliveryTypeName, EmailSubmissionTypeName, EmailTypeName, IdentityTypeName, MailboxTypeName, ThreadTypeName, TypeName, TypeState, VacationResponseTypeName}
+import org.apache.james.jmap.change.{TypeName, TypeState}
 import org.apache.james.jmap.core
 import org.apache.james.jmap.core.CapabilityIdentifier.CapabilityIdentifier
 import org.apache.james.jmap.core.Id.IdConstraint
@@ -187,13 +187,8 @@ object ResponseSerializer {
     case _ => JsError("Expecting a JsObject to represent a webSocket inbound request")
   }
   private implicit val typeNameReads: Reads[TypeName] = {
-    case JsString(MailboxTypeName.asString) => JsSuccess(MailboxTypeName)
-    case JsString(EmailTypeName.asString) => JsSuccess(EmailTypeName)
-    case JsString(ThreadTypeName.asString) => JsSuccess(ThreadTypeName)
-    case JsString(IdentityTypeName.asString) => JsSuccess(IdentityTypeName)
-    case JsString(EmailSubmissionTypeName.asString) => JsSuccess(EmailSubmissionTypeName)
-    case JsString(EmailDeliveryTypeName.asString) => JsSuccess(EmailDeliveryTypeName)
-    case JsString(VacationResponseTypeName.asString) => JsSuccess(VacationResponseTypeName)
+    case JsString(string) => TypeName.parse(string)
+      .fold(errorMessage => JsError(errorMessage), JsSuccess(_))
     case _ => JsError("Expecting a JsString as typeName")
   }
   private implicit val webSocketPushEnableReads: Reads[WebSocketPushEnable] = Json.reads[WebSocketPushEnable]
@@ -232,7 +227,9 @@ object ResponseSerializer {
       "requestId" -> error.requestId.map(_.value).map(JsString).getOrElse(JsNull))
       ++ errorJson.value)
   }
-  private implicit val webSocketOutboundWrites: Writes[WebSocketOutboundMessage] = {
+  private implicit val pingWrites: Writes[PingMessage] = Json.writes[PingMessage]
+  private implicit val webSocketOutboundWrites: Writes[OutboundMessage] = {
+    case pingMessage: PingMessage => pingWrites.writes(pingMessage)
     case stateChange: StateChange => stateChangeWrites.writes(stateChange)
     case response: WebSocketResponse => webSocketResponseWrites.writes(response)
     case error: WebSocketError => webSocketErrorWrites.writes(error)
@@ -248,7 +245,7 @@ object ResponseSerializer {
 
   def serialize(errors: JsError): JsValue = Json.toJson(errors)
 
-  def serialize(outboundMessage: WebSocketOutboundMessage): JsValue = Json.toJson(outboundMessage)
+  def serialize(outboundMessage: OutboundMessage): JsValue = Json.toJson(outboundMessage)
 
   def deserializeRequestObject(input: String): JsResult[RequestObject] = Json.parse(input).validate[RequestObject]
 
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/EventSourceRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/EventSourceRoutes.scala
new file mode 100644
index 0000000..b5e0723
--- /dev/null
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/EventSourceRoutes.scala
@@ -0,0 +1,227 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *  http://www.apache.org/licenses/LICENSE-2.0                  *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.jmap.routes
+
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.atomic.AtomicReference
+import java.util.stream
+
+import cats.implicits._
+import eu.timepit.refined.api.Refined
+import eu.timepit.refined.numeric.Positive
+import eu.timepit.refined.refineV
+import io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE
+import io.netty.handler.codec.http.{HttpMethod, QueryStringDecoder}
+import javax.inject.{Inject, Named}
+import org.apache.james.events.{EventBus, Registration}
+import org.apache.james.jmap.HttpConstants.JSON_CONTENT_TYPE
+import org.apache.james.jmap.JMAPUrls.EVENT_SOURCE
+import org.apache.james.jmap.change.{AccountIdRegistrationKey, StateChangeListener, TypeName}
+import org.apache.james.jmap.core.{OutboundMessage, PingMessage, ProblemDetails, StateChange}
+import org.apache.james.jmap.http.rfc8621.InjectionKeys
+import org.apache.james.jmap.http.{Authenticator, UserProvisioning}
+import org.apache.james.jmap.json.ResponseSerializer
+import org.apache.james.jmap.routes.PingPolicy.Interval
+import org.apache.james.jmap.{Endpoint, JMAPRoute, JMAPRoutes, InjectionKeys => JMAPInjectionKeys}
+import org.apache.james.mailbox.MailboxSession
+import play.api.libs.json.Json
+import reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST
+import reactor.core.publisher.{Mono, Sinks}
+import reactor.core.scala.publisher.{SFlux, SMono}
+import reactor.core.scheduler.Schedulers
+import reactor.netty.http.server.{HttpServerRequest, HttpServerResponse}
+
+import scala.concurrent.duration._
+import scala.jdk.CollectionConverters._
+import scala.language.postfixOps
+import scala.util.{Failure, Success, Try}
+
+object EventSourceOptions {
+  def forRequest(request: HttpServerRequest): Either[IllegalArgumentException, EventSourceOptions] =
+    for {
+      pingPolicy <- retrievePing(request)
+      closeAfter <- retrieveCloseAfter(request)
+      types <- retrieveTypes(request)
+    } yield {
+      EventSourceOptions(pingPolicy = pingPolicy,
+        closeAfter = closeAfter,
+        types = types)
+    }
+
+  private def retrieveTypes(request: HttpServerRequest): Either[IllegalArgumentException, Set[TypeName]] =
+    queryParam(request, "types") match {
+      case None => Left(new IllegalArgumentException("types parameter is compulsory"))
+      case Some(List("*")) => Right(TypeName.ALL)
+      case Some(list) => list.flatMap(_.split(","))
+        .map(string =>
+          TypeName.parse(string)
+            .left.map(errorMessage => new IllegalArgumentException(errorMessage)))
+        .sequence.map(_.toSet)
+    }
+
+  private def retrievePing(request: HttpServerRequest): Either[IllegalArgumentException, PingPolicy] =
+    queryParam(request, "ping") match {
+      case None => Left(new IllegalArgumentException("ping parameter is compulsory"))
+      case Some(List(value)) => PingPolicy.parse(value)
+      case _ => Left(new IllegalArgumentException("ping query parameter must be constituted of a single string value"))
+    }
+
+  private def retrieveCloseAfter(request: HttpServerRequest): Either[IllegalArgumentException, CloseAfter] =
+    queryParam(request, "closeAfter") match {
+      case None => Left(new IllegalArgumentException("closeAfter parameter is compulsory"))
+      case Some(List(value)) => CloseAfter.parse(value)
+      case _ => Left(new IllegalArgumentException("closeAfter query parameter must be constituted of a single string value"))
+    }
+
+  private def queryParam(httpRequest: HttpServerRequest, parameterName: String): Option[List[String]] = queryParam(parameterName, httpRequest.uri)
+
+  private def queryParam(parameterName: String, uri: String): Option[List[String]] =
+    Option(new QueryStringDecoder(removeTrailingSlash(uri))
+      .parameters
+      .get(parameterName))
+      .map(_.asScala.toList)
+
+  def removeTrailingSlash(uri: String): String = uri match {
+    case u if u.endsWith("/") => u.substring(0, u.length -1)
+    case _ => uri
+  }
+}
+
+case class EventSourceOptions(types: Set[TypeName] = TypeName.ALL,
+                             pingPolicy: PingPolicy = NoPingPolicy,
+                             closeAfter: CloseAfter = NoCloseAfter)
+
+object PingPolicy {
+  type Interval = Int Refined Positive
+
+  def parse(string: String): Either[IllegalArgumentException, PingPolicy] =
+    Try(string.toInt) match {
+      case Failure(exception) => Left(new IllegalArgumentException(exception))
+      case Success(0) => Right(NoPingPolicy)
+      case Success(intervalInSeconds) =>
+        refineV[Positive](intervalInSeconds)
+          .fold(errorMessage => Left(new IllegalArgumentException(errorMessage)),
+          interval => Right(PingEnabled(interval)))
+    }
+}
+sealed trait PingPolicy {
+  def asFlux(): SFlux[PingMessage]
+}
+case object NoPingPolicy extends PingPolicy {
+  override def asFlux(): SFlux[PingMessage] = SFlux.never[PingMessage]()
+}
+case class PingEnabled(interval: Interval) extends PingPolicy {
+  override def asFlux(): SFlux[PingMessage] = SFlux.interval(interval.value seconds, Schedulers.elastic())
+    .map(_ => PingMessage(interval))
+}
+
+object CloseAfter {
+  def parse(string: String): Either[IllegalArgumentException, CloseAfter] = string match {
+    case "no" => Right(NoCloseAfter)
+    case "state" => Right(CloseAfterState)
+    case unsupported: String => Left(new IllegalArgumentException(s"$unsupported is not a supported value for eventSource closeAfter parameter"))
+  }
+}
+sealed trait CloseAfter {
+  def applyOn(flux: SFlux[OutboundMessage]): SFlux[OutboundMessage]
+}
+case object CloseAfterState extends CloseAfter {
+  override def applyOn(flux: SFlux[OutboundMessage]): SFlux[OutboundMessage] = flux.takeUntil {
+    case _: StateChange => true
+    case _ => false
+  }
+}
+case object NoCloseAfter extends CloseAfter {
+  override def applyOn(flux: SFlux[OutboundMessage]): SFlux[OutboundMessage] = flux
+}
+
+class EventSourceRoutes@Inject() (@Named(InjectionKeys.RFC_8621) val authenticator: Authenticator,
+                                  userProvisioner: UserProvisioning,
+                                  @Named(JMAPInjectionKeys.JMAP) eventBus: EventBus) extends JMAPRoutes {
+
+  override def routes(): stream.Stream[JMAPRoute] = stream.Stream.of(
+    JMAPRoute.builder
+      .endpoint(new Endpoint(HttpMethod.GET, EVENT_SOURCE))
+      .action(this.handleSSE)
+      .corsHeaders,
+    JMAPRoute.builder
+      .endpoint(new Endpoint(HttpMethod.OPTIONS, EVENT_SOURCE))
+      .action(JMAPRoutes.CORS_CONTROL)
+      .corsHeaders())
+
+  private def handleSSE(request: HttpServerRequest, response: HttpServerResponse): Mono[Void] =
+    EventSourceOptions.forRequest(request)
+      .fold(e => SMono.error[Void](e),
+        options => SMono(authenticator.authenticate(request))
+          .flatMap((mailboxSession: MailboxSession) => userProvisioner.provisionUser(mailboxSession)
+            .`then`
+            .`then`(registerSSE(response, mailboxSession, options))))
+      .onErrorResume(throwable => handleConnectionEstablishmentError(throwable, response))
+      .subscribeOn(Schedulers.elastic)
+      .asJava()
+      .`then`()
+
+  private def registerSSE(response: HttpServerResponse, session: MailboxSession, options: EventSourceOptions): SMono[Unit] = {
+    val sink: Sinks.Many[OutboundMessage] = Sinks.many().unicast().onBackpressureBuffer()
+    val context = ClientContext(sink, new AtomicReference[Registration](), session)
+
+    val pingDisposable = options.pingPolicy
+      .asFlux()
+      .subscribe(ping => context.outbound.emitNext(ping, FAIL_FAST))
+
+    SMono(
+      eventBus.register(
+        StateChangeListener(options.types, context.outbound),
+        AccountIdRegistrationKey.of(session.getUser)))
+      .doOnNext(newRegistration => context.withRegistration(newRegistration))
+      .subscribeOn(Schedulers.elastic())
+      .subscribe()
+
+    SMono(response
+      .addHeader("Connection", "keep-alive")
+      .sse()
+      .sendString(
+        options.closeAfter.applyOn(SFlux(sink.asFlux()))
+          .map(asSSEEvent),
+        StandardCharsets.UTF_8).`then`
+      .doFinally(_ => context.clean())
+      .doFinally(_ => pingDisposable.dispose())
+      .`then`())
+      .`then`()
+  }
+
+  private def asSSEEvent(outboundMessage: OutboundMessage): String = {
+    val event: String = outboundMessage match {
+      case _: PingMessage => "ping"
+      case _: StateChange => "state"
+    }
+    s"event: $event\ndata: ${Json.stringify(ResponseSerializer.serialize(outboundMessage))}\n\n"
+  }
+
+  private def handleConnectionEstablishmentError(throwable: Throwable, response: HttpServerResponse): SMono[Void] =
+    respondDetails(response, ProblemDetails.forThrowable(throwable))
+
+  private def respondDetails(response: HttpServerResponse, details: ProblemDetails): SMono[Void] =
+    SMono.fromPublisher(response.status(details.status)
+      .header(CONTENT_TYPE, JSON_CONTENT_TYPE)
+      .sendString(SMono.fromCallable(() => ResponseSerializer.serialize(details).toString),
+        StandardCharsets.UTF_8)
+      .`then`)
+}
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
index c45ef45..5f556a1 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
@@ -31,7 +31,7 @@ import org.apache.james.events.{EventBus, Registration}
 import org.apache.james.jmap.HttpConstants.JSON_CONTENT_TYPE
 import org.apache.james.jmap.JMAPUrls.JMAP_WS
 import org.apache.james.jmap.change.{AccountIdRegistrationKey, StateChangeListener, TypeName}
-import org.apache.james.jmap.core.{ProblemDetails, RequestId, WebSocketError, WebSocketOutboundMessage, WebSocketPushDisable, WebSocketPushEnable, WebSocketRequest, WebSocketResponse}
+import org.apache.james.jmap.core.{OutboundMessage, ProblemDetails, RequestId, WebSocketError, WebSocketPushDisable, WebSocketPushEnable, WebSocketRequest, WebSocketResponse}
 import org.apache.james.jmap.http.rfc8621.InjectionKeys
 import org.apache.james.jmap.http.{Authenticator, UserProvisioning}
 import org.apache.james.jmap.json.ResponseSerializer
@@ -50,7 +50,7 @@ object WebSocketRoutes {
   val LOGGER: Logger = LoggerFactory.getLogger(classOf[WebSocketRoutes])
 }
 
-case class ClientContext(outbound: Sinks.Many[WebSocketOutboundMessage], pushRegistration: AtomicReference[Registration], session: MailboxSession) {
+case class ClientContext(outbound: Sinks.Many[OutboundMessage], pushRegistration: AtomicReference[Registration], session: MailboxSession) {
   def withRegistration(registration: Registration): Unit = withRegistration(Some(registration))
 
   def clean(): Unit = withRegistration(None)
@@ -88,7 +88,7 @@ class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticato
   }
 
   private def handleWebSocketConnection(session: MailboxSession)(in: WebsocketInbound, out: WebsocketOutbound): Mono[Void] = {
-    val sink: Sinks.Many[WebSocketOutboundMessage] = Sinks.many().unicast().onBackpressureBuffer()
+    val sink: Sinks.Many[OutboundMessage] = Sinks.many().unicast().onBackpressureBuffer()
 
     out.sendString(
       sink.asFlux()
@@ -121,10 +121,9 @@ class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticato
       }, {
           case request: WebSocketRequest =>
             jmapApi.process(request.requestObject, clientContext.session)
-              .map[WebSocketOutboundMessage](WebSocketResponse(request.requestId, _))
+              .map[OutboundMessage](WebSocketResponse(request.requestId, _))
               .onErrorResume(e => SMono.just(asError(request.requestId)(e)))
               .subscribeOn(Schedulers.elastic)
-              .onErrorResume(e => SMono.just[WebSocketOutboundMessage](asError(None)(e)))
               .doOnNext(next => clientContext.outbound.emitNext(next, FAIL_FAST))
               .`then`()
           case pushEnable: WebSocketPushEnable =>
diff --git a/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/StateChangeListenerTest.scala b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/StateChangeListenerTest.scala
index 2b504bf..d2d717a 100644
--- a/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/StateChangeListenerTest.scala
+++ b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/StateChangeListenerTest.scala
@@ -21,7 +21,7 @@ package org.apache.james.jmap.change
 
 import org.apache.james.core.Username
 import org.apache.james.events.Event.EventId
-import org.apache.james.jmap.core.{AccountId, State, StateChange, WebSocketOutboundMessage}
+import org.apache.james.jmap.core.{AccountId, OutboundMessage, State, StateChange}
 import org.assertj.core.api.Assertions.assertThat
 import org.junit.jupiter.api.Test
 import reactor.core.publisher.Sinks
@@ -36,7 +36,7 @@ class StateChangeListenerTest {
 
   @Test
   def reactiveEventShouldSendAnOutboundMessage(): Unit = {
-    val sink: Sinks.Many[WebSocketOutboundMessage] = Sinks.many().unicast().onBackpressureBuffer()
+    val sink: Sinks.Many[OutboundMessage] = Sinks.many().unicast().onBackpressureBuffer()
     val event = StateChangeEvent(eventId = eventId,
       username = Username.of("bob"),
       mailboxState = Some(mailboxState),
@@ -56,7 +56,7 @@ class StateChangeListenerTest {
 
   @Test
   def reactiveEventShouldOmitUnwantedTypes(): Unit = {
-    val sink: Sinks.Many[WebSocketOutboundMessage] = Sinks.many().unicast().onBackpressureBuffer()
+    val sink: Sinks.Many[OutboundMessage] = Sinks.many().unicast().onBackpressureBuffer()
     val event = StateChangeEvent(eventId = eventId,
       username = Username.of("bob"),
       mailboxState = Some(mailboxState),
@@ -75,7 +75,7 @@ class StateChangeListenerTest {
 
   @Test
   def reactiveEventShouldFilterOutUnwantedEvents(): Unit = {
-    val sink: Sinks.Many[WebSocketOutboundMessage] = Sinks.many().unicast().onBackpressureBuffer()
+    val sink: Sinks.Many[OutboundMessage] = Sinks.many().unicast().onBackpressureBuffer()
     val event = StateChangeEvent(eventId = eventId,
       username = Username.of("bob"),
       mailboxState = None,
diff --git a/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/core/JmapRfc8621ConfigurationTest.scala b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/core/JmapRfc8621ConfigurationTest.scala
index 387f610..d2db063 100644
--- a/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/core/JmapRfc8621ConfigurationTest.scala
+++ b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/core/JmapRfc8621ConfigurationTest.scala
@@ -44,7 +44,7 @@ class JmapRfc8621ConfigurationTest extends AnyWordSpec with Matchers {
       jmapRfc8621Configuration.apiUrl must be(new URL("http://random-domain.com/jmap"))
       jmapRfc8621Configuration.downloadUrl must be(new URL("http://random-domain.com/download/{accountId}/{blobId}/?type={type}&name={name}"))
       jmapRfc8621Configuration.uploadUrl must be(new URL("http://random-domain.com/upload/{accountId}"))
-      jmapRfc8621Configuration.eventSourceUrl must be(new URL("http://random-domain.com/eventSource"))
+      jmapRfc8621Configuration.eventSourceUrl must be(new URL("http://random-domain.com/eventSource?types={types}&closeAfter={closeAfter}&ping={ping}"))
     }
 
     "load default config for urlPrefix when no configuration provided" in {
@@ -53,7 +53,7 @@ class JmapRfc8621ConfigurationTest extends AnyWordSpec with Matchers {
       jmapRfc8621Configuration.apiUrl must be(new URL("http://localhost/jmap"))
       jmapRfc8621Configuration.downloadUrl must be(new URL("http://localhost/download/{accountId}/{blobId}/?type={type}&name={name}"))
       jmapRfc8621Configuration.uploadUrl must be(new URL("http://localhost/upload/{accountId}"))
-      jmapRfc8621Configuration.eventSourceUrl must be(new URL("http://localhost/eventSource"))
+      jmapRfc8621Configuration.eventSourceUrl must be(new URL("http://localhost/eventSource?types={types}&closeAfter={closeAfter}&ping={ping}"))
     }
   }
 }
diff --git a/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/routes/SessionRoutesTest.scala b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/routes/SessionRoutesTest.scala
index aae99de..8fb5057 100644
--- a/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/routes/SessionRoutesTest.scala
+++ b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/routes/SessionRoutesTest.scala
@@ -203,7 +203,7 @@ class SessionRoutesTest extends AnyFlatSpec with BeforeAndAfter with Matchers {
                          |  "apiUrl" : "$LOCALHOST_URL_PREFIX/jmap",
                          |  "downloadUrl" : "$LOCALHOST_URL_PREFIX/$downloadPath",
                          |  "uploadUrl" : "$LOCALHOST_URL_PREFIX/upload/{accountId}",
-                         |  "eventSourceUrl" : "$LOCALHOST_URL_PREFIX/eventSource",
+                         |  "eventSourceUrl" : "$LOCALHOST_URL_PREFIX/eventSource?types={types}&closeAfter={closeAfter}&ping={ping}",
                          |  "state" : "${INSTANCE.value}"
                          |}""".stripMargin
 
diff --git a/server/protocols/jmap/src/main/java/org/apache/james/jmap/JMAPUrls.java b/server/protocols/jmap/src/main/java/org/apache/james/jmap/JMAPUrls.java
index 6148383..f409893 100644
--- a/server/protocols/jmap/src/main/java/org/apache/james/jmap/JMAPUrls.java
+++ b/server/protocols/jmap/src/main/java/org/apache/james/jmap/JMAPUrls.java
@@ -25,5 +25,6 @@ public interface JMAPUrls {
     String AUTHENTICATION = "/authentication";
     String DOWNLOAD = "/download";
     String UPLOAD = "/upload";
+    String EVENT_SOURCE = "/eventSource";
     String NOT_IMPLEMENTED = "/notImplemented";
 }
diff --git a/server/protocols/jmap/src/main/java/org/apache/james/jmap/http/Authenticator.java b/server/protocols/jmap/src/main/java/org/apache/james/jmap/http/Authenticator.java
index cc9ab60..3ed1a12 100644
--- a/server/protocols/jmap/src/main/java/org/apache/james/jmap/http/Authenticator.java
+++ b/server/protocols/jmap/src/main/java/org/apache/james/jmap/http/Authenticator.java
@@ -23,8 +23,6 @@ import java.util.List;
 import org.apache.james.jmap.exceptions.UnauthorizedException;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.metrics.api.MetricFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
@@ -34,8 +32,6 @@ import reactor.core.publisher.Mono;
 import reactor.netty.http.server.HttpServerRequest;
 
 public class Authenticator {
-    private static final Logger LOGGER = LoggerFactory.getLogger(Authenticator.class);
-
     public static Authenticator of(MetricFactory metricFactory, AuthenticationStrategy... authenticationStrategies) {
         return new Authenticator(ImmutableList.copyOf(authenticationStrategies), metricFactory);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org