You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/01/16 09:13:10 UTC

[james-project] branch master updated: JAMES-3756 JMAP pushes should support delegation (#1386)

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b77dfe382 JAMES-3756 JMAP pushes should support delegation (#1386)
0b77dfe382 is described below

commit 0b77dfe382217c600f73ed2654897fb2ae580402
Author: Trần Hồng Quân <55...@users.noreply.github.com>
AuthorDate: Mon Jan 16 16:13:05 2023 +0700

    JAMES-3756 JMAP pushes should support delegation (#1386)
---
 .../java/org/apache/james/events/EventBus.java     |   9 ++
 .../distributed/DistributedWebSocketTest.java      |   3 +-
 .../rfc8621/contract/EventSourceContract.scala     | 146 +++++++++++++++++-
 .../jmap/rfc8621/contract/WebSocketContract.scala  | 163 ++++++++++++++++++++-
 .../jmap/rfc8621/memory/MemoryEventSourceTest.java |   3 +-
 .../jmap/rfc8621/memory/MemoryWebSocketTest.java   |   3 +-
 .../james/jmap/pushsubscription/PushListener.scala |  12 +-
 .../james/jmap/routes/EventSourceRoutes.scala      |  15 +-
 .../apache/james/jmap/routes/WebSocketRoutes.scala |  20 ++-
 .../jmap/pushsubscription/PushListenerTest.scala   |  76 +++++++++-
 10 files changed, 422 insertions(+), 28 deletions(-)

diff --git a/event-bus/api/src/main/java/org/apache/james/events/EventBus.java b/event-bus/api/src/main/java/org/apache/james/events/EventBus.java
index 11aec038cf..418af9dd77 100644
--- a/event-bus/api/src/main/java/org/apache/james/events/EventBus.java
+++ b/event-bus/api/src/main/java/org/apache/james/events/EventBus.java
@@ -27,6 +27,7 @@ import org.reactivestreams.Publisher;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 
+import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public interface EventBus {
@@ -54,6 +55,14 @@ public interface EventBus {
 
     Publisher<Registration> register(EventListener.ReactiveEventListener listener, RegistrationKey key);
 
+    default Publisher<Registration> register(EventListener.ReactiveEventListener listener, Collection<RegistrationKey> keys) {
+        return Flux.fromIterable(keys)
+            .concatMap(key -> register(listener, key))
+            .reduce((reg1, reg2) -> () -> Flux.merge(reg1.unregister(), reg2.unregister()))
+            .map(unRegistrationWithMergedFlux -> () -> Mono.from(Flux.from(unRegistrationWithMergedFlux.unregister())
+                .then()));
+    }
+
     Registration register(EventListener.ReactiveEventListener listener, Group group) throws GroupAlreadyRegistered;
 
     default Registration register(EventListener listener, Group group) throws GroupAlreadyRegistered {
diff --git a/server/protocols/jmap-rfc-8621-integration-tests/distributed-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/distributed/DistributedWebSocketTest.java b/server/protocols/jmap-rfc-8621-integration-tests/distributed-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/distributed/DistributedWebSocketTest.java
index 6772b1516d..83cc84e857 100644
--- a/server/protocols/jmap-rfc-8621-integration-tests/distributed-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/distributed/DistributedWebSocketTest.java
+++ b/server/protocols/jmap-rfc-8621-integration-tests/distributed-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/distributed/DistributedWebSocketTest.java
@@ -26,6 +26,7 @@ import org.apache.james.DockerOpenSearchExtension;
 import org.apache.james.JamesServerBuilder;
 import org.apache.james.JamesServerExtension;
 import org.apache.james.jmap.rfc8621.contract.WebSocketContract;
+import org.apache.james.jmap.rfc8621.contract.probe.DelegationProbeModule;
 import org.apache.james.modules.AwsS3BlobStoreExtension;
 import org.apache.james.modules.RabbitMQExtension;
 import org.apache.james.modules.TestJMAPServerModule;
@@ -49,6 +50,6 @@ public class DistributedWebSocketTest implements WebSocketContract {
         .extension(new RabbitMQExtension())
         .extension(new AwsS3BlobStoreExtension())
         .server(configuration -> CassandraRabbitMQJamesServerMain.createServer(configuration)
-            .overrideWith(new TestJMAPServerModule()))
+            .overrideWith(new TestJMAPServerModule(), new DelegationProbeModule()))
         .build();
 }
diff --git a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/EventSourceContract.scala b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/EventSourceContract.scala
index 1d24f53c5c..a31f740cc2 100644
--- a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/EventSourceContract.scala
+++ b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/EventSourceContract.scala
@@ -27,9 +27,11 @@ import io.netty.handler.codec.http.HttpResponseStatus
 import io.restassured.RestAssured.{`given`, requestSpecification}
 import org.apache.http.HttpStatus.SC_OK
 import org.apache.james.GuiceJamesServer
+import org.apache.james.core.Username
 import org.apache.james.jmap.draft.JmapGuiceProbe
 import org.apache.james.jmap.http.UserCredential
-import org.apache.james.jmap.rfc8621.contract.Fixture.{ACCEPT_RFC8621_VERSION_HEADER, ACCOUNT_ID, ANDRE, ANDRE_PASSWORD, BOB, BOB_PASSWORD, DOMAIN, authScheme, baseRequestSpecBuilder}
+import org.apache.james.jmap.rfc8621.contract.Fixture.{ACCEPT_RFC8621_VERSION_HEADER, ACCOUNT_ID, ANDRE, ANDRE_PASSWORD, BOB, BOB_PASSWORD, DAVID, DAVID_ACCOUNT_ID, DOMAIN, authScheme, baseRequestSpecBuilder}
+import org.apache.james.jmap.rfc8621.contract.probe.DelegationProbe
 import org.apache.james.mailbox.DefaultMailboxes
 import org.apache.james.mailbox.MessageManager.AppendCommand
 import org.apache.james.mailbox.model.{MailboxPath, MessageId}
@@ -38,6 +40,7 @@ import org.apache.james.modules.MailboxProbeImpl
 import org.apache.james.modules.protocols.SmtpGuiceProbe
 import org.apache.james.utils.{DataProbeImpl, SMTPMessageSender, SpoolerProbe}
 import org.assertj.core.api.Assertions.assertThat
+import org.assertj.core.api.SoftAssertions
 import org.awaitility.Awaitility
 import org.awaitility.Durations.ONE_HUNDRED_MILLISECONDS
 import org.awaitility.core.ConditionFactory
@@ -62,6 +65,7 @@ trait EventSourceContract {
       .addDomain(DOMAIN.asString())
       .addUser(ANDRE.asString(), ANDRE_PASSWORD)
       .addUser(BOB.asString(), BOB_PASSWORD)
+      .addUser(DAVID.asString(), "secret")
 
     requestSpecification = baseRequestSpecBuilder(server)
       .setAuth(authScheme(UserCredential(BOB, BOB_PASSWORD)))
@@ -307,7 +311,7 @@ trait EventSourceContract {
 
     // Bob receives a mail
     Thread.sleep(500)
-    sendEmailToBob(server)
+    sendEmailTo(server, BOB)
 
     awaitAtMostTenSeconds.untilAsserted(() => {
       assertThat(seq.asJava)
@@ -558,11 +562,145 @@ trait EventSourceContract {
       .hasSize(2)
   }
 
-  private def sendEmailToBob(server: GuiceJamesServer): Unit = {
+  @Test
+  def shouldPushChangesToDelegatedUser(server: GuiceJamesServer): Unit = {
+    val port = server.getProbe(classOf[JmapGuiceProbe]).getJmapPort.getValue
+    val davidPath = MailboxPath.inbox(DAVID)
+    server.getProbe(classOf[MailboxProbeImpl]).createMailbox(davidPath)
+
+    // DAVID delegates BOB to access his account
+    server.getProbe(classOf[DelegationProbe]).addAuthorizedUser(DAVID, BOB)
+
+    val seq = new ListBuffer[String]()
+    HttpClient.create
+      .baseUrl(s"http://127.0.0.1:$port/eventSource?types=EmailDelivery&ping=0&closeAfter=no")
+      .headers(builder => {
+        builder.add("Authorization", "Basic Ym9iQGRvbWFpbi50bGQ6Ym9icGFzc3dvcmQ=")
+        builder.add("Accept", ACCEPT_RFC8621_VERSION_HEADER)
+      })
+      .get()
+      .responseContent()
+      .map(bb => {
+        val bytes = new Array[Byte](bb.readableBytes)
+        bb.readBytes(bytes)
+        new String(bytes, StandardCharsets.UTF_8)
+      })
+      .doOnNext(seq.addOne)
+      .subscribeOn(Schedulers.boundedElastic())
+      .subscribe()
+
+    Thread.sleep(500)
+    // DAVID has a new mail therefore EmailDelivery change
+    sendEmailTo(server, DAVID)
+
+    // Bob should receive DAVID's EmailDelivery state change
+    awaitAtMostTenSeconds.untilAsserted(() => {
+      SoftAssertions.assertSoftly(softly => {
+        softly.assertThat(seq.asJava)
+          .hasSize(1)
+        softly.assertThat(seq.head)
+          .contains("EmailDelivery", DAVID_ACCOUNT_ID)
+      })
+    })
+  }
+
+  @Test
+  def ownerUserShouldStillReceiveHisChangesWhenHeDelegatesHisAccountToOtherUsers(server: GuiceJamesServer): Unit = {
+    val port = server.getProbe(classOf[JmapGuiceProbe]).getJmapPort.getValue
+    val bobPath = MailboxPath.inbox(BOB)
+    server.getProbe(classOf[MailboxProbeImpl]).createMailbox(bobPath)
+
+    // BOB delegates DAVID to access his account
+    server.getProbe(classOf[DelegationProbe]).addAuthorizedUser(BOB, DAVID)
+
+    val seq = new ListBuffer[String]()
+    HttpClient.create
+      .baseUrl(s"http://127.0.0.1:$port/eventSource?types=EmailDelivery&ping=0&closeAfter=no")
+      .headers(builder => {
+        builder.add("Authorization", "Basic Ym9iQGRvbWFpbi50bGQ6Ym9icGFzc3dvcmQ=")
+        builder.add("Accept", ACCEPT_RFC8621_VERSION_HEADER)
+      })
+      .get()
+      .responseContent()
+      .map(bb => {
+        val bytes = new Array[Byte](bb.readableBytes)
+        bb.readBytes(bytes)
+        new String(bytes, StandardCharsets.UTF_8)
+      })
+      .doOnNext(seq.addOne)
+      .subscribeOn(Schedulers.boundedElastic())
+      .subscribe()
+
+    Thread.sleep(500)
+    // BOB has a new mail therefore EmailDelivery change
+    sendEmailTo(server, BOB)
+
+    // Bob should receive his EmailDelivery state change
+    awaitAtMostTenSeconds.untilAsserted(() => {
+      SoftAssertions.assertSoftly(softly => {
+        softly.assertThat(seq.asJava)
+          .hasSize(1)
+        softly.assertThat(seq.head)
+          .contains("EmailDelivery", ACCOUNT_ID)
+      })
+    })
+  }
+
+  @Test
+  def bobShouldReceiveHisChangesAndHisDelegatedAccountChanges(server: GuiceJamesServer): Unit = {
+    val port = server.getProbe(classOf[JmapGuiceProbe]).getJmapPort.getValue
+    val davidPath = MailboxPath.inbox(DAVID)
+    server.getProbe(classOf[MailboxProbeImpl]).createMailbox(davidPath)
+
+    // DAVID delegates BOB to access his account
+    server.getProbe(classOf[DelegationProbe]).addAuthorizedUser(DAVID, BOB)
+
+    val seq = new ListBuffer[String]()
+    HttpClient.create
+      .baseUrl(s"http://127.0.0.1:$port/eventSource?types=EmailDelivery&ping=0&closeAfter=no")
+      .headers(builder => {
+        builder.add("Authorization", "Basic Ym9iQGRvbWFpbi50bGQ6Ym9icGFzc3dvcmQ=")
+        builder.add("Accept", ACCEPT_RFC8621_VERSION_HEADER)
+      })
+      .get()
+      .responseContent()
+      .map(bb => {
+        val bytes = new Array[Byte](bb.readableBytes)
+        bb.readBytes(bytes)
+        new String(bytes, StandardCharsets.UTF_8)
+      })
+      .doOnNext(seq.addOne)
+      .subscribeOn(Schedulers.boundedElastic())
+      .subscribe()
+
+    Thread.sleep(500)
+    sendEmailTo(server, DAVID)
+    sendEmailTo(server, BOB)
+    sendEmailTo(server, DAVID)
+    sendEmailTo(server, BOB)
+
+    // Bob should receive David's change and his changes
+    awaitAtMostTenSeconds.untilAsserted(() => {
+      SoftAssertions.assertSoftly(softly => {
+        softly.assertThat(seq.asJava)
+          .hasSize(4)
+        softly.assertThat(seq.apply(0))
+          .contains("EmailDelivery", DAVID_ACCOUNT_ID)
+        softly.assertThat(seq.apply(1))
+          .contains("EmailDelivery", ACCOUNT_ID)
+        softly.assertThat(seq.apply(2))
+          .contains("EmailDelivery", DAVID_ACCOUNT_ID)
+        softly.assertThat(seq.apply(3))
+          .contains("EmailDelivery", ACCOUNT_ID)
+      })
+    })
+  }
+
+  private def sendEmailTo(server: GuiceJamesServer, recipient: Username): Unit = {
     val smtpMessageSender: SMTPMessageSender = new SMTPMessageSender(DOMAIN.asString())
     smtpMessageSender.connect("127.0.0.1", server.getProbe(classOf[SmtpGuiceProbe]).getSmtpPort)
       .authenticate(ANDRE.asString, ANDRE_PASSWORD)
-      .sendMessage(ANDRE.asString, BOB.asString())
+      .sendMessage(ANDRE.asString, recipient.asString())
     smtpMessageSender.close()
 
     awaitAtMostTenSeconds.until(() => server.getProbe(classOf[SpoolerProbe]).processingFinished())
diff --git a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketContract.scala b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketContract.scala
index dacb09c047..cd5ea01246 100644
--- a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketContract.scala
+++ b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketContract.scala
@@ -24,11 +24,13 @@ import java.util.concurrent.TimeUnit
 
 import net.javacrumbs.jsonunit.assertj.JsonAssertions.assertThatJson
 import org.apache.james.GuiceJamesServer
+import org.apache.james.core.Username
 import org.apache.james.jmap.api.change.State
 import org.apache.james.jmap.api.model.AccountId
 import org.apache.james.jmap.core.{PushState, UuidState}
 import org.apache.james.jmap.draft.JmapGuiceProbe
 import org.apache.james.jmap.rfc8621.contract.Fixture._
+import org.apache.james.jmap.rfc8621.contract.probe.DelegationProbe
 import org.apache.james.mailbox.MessageManager.AppendCommand
 import org.apache.james.mailbox.model.MailboxACL.Right
 import org.apache.james.mailbox.model.{MailboxACL, MailboxPath}
@@ -73,6 +75,7 @@ trait WebSocketContract {
       .addDomain(DOMAIN.asString())
       .addUser(ANDRE.asString(), ANDRE_PASSWORD)
       .addUser(BOB.asString(), BOB_PASSWORD)
+      .addUser(DAVID.asString(), "secret")
   }
 
   @Test
@@ -486,6 +489,160 @@ trait WebSocketContract {
       .contains(stateChange)
   }
 
+  @Test
+  @Timeout(180)
+  def shouldPushChangesToDelegatedUser(server: GuiceJamesServer): Unit = {
+    val davidPath = MailboxPath.inbox(DAVID)
+    server.getProbe(classOf[MailboxProbeImpl]).createMailbox(davidPath)
+
+    // DAVID delegates BOB to access his account
+    server.getProbe(classOf[DelegationProbe]).addAuthorizedUser(DAVID, BOB)
+
+    Thread.sleep(100)
+
+    val response: Either[String, List[String]] =
+      authenticatedRequest(server)
+        .response(asWebSocket[Identity, List[String]] {
+          ws =>
+            ws.send(WebSocketFrame.text(
+              """{
+                |  "@type": "WebSocketPushEnable",
+                |  "dataTypes": ["EmailDelivery"]
+                |}""".stripMargin))
+
+            Thread.sleep(100)
+
+            // DAVID has a new mail therefore EmailDelivery change
+            sendEmailTo(server, DAVID)
+
+            List(
+              ws.receive()
+                .map { case t: Text =>
+                  t.payload
+                })
+        })
+        .send(backend)
+        .body
+
+    Thread.sleep(100)
+
+    // Bob should receive DAVID's EmailDelivery state change
+    assertThat(response.toOption.get.asJava)
+      .hasSize(1)
+
+    assertThatJson(response.toOption.get.asJava.get(0))
+      .isEqualTo(s"""{"@type":"StateChange","changed":{"$DAVID_ACCOUNT_ID":{"EmailDelivery":"$${json-unit.ignore}"}},"pushState":"$${json-unit.ignore}"}""".stripMargin)
+  }
+
+  @Test
+  @Timeout(180)
+  def ownerUserShouldStillReceiveHisChangesWhenHeDelegatesHisAccountToOtherUsers(server: GuiceJamesServer): Unit = {
+    val bobPath = MailboxPath.inbox(BOB)
+    server.getProbe(classOf[MailboxProbeImpl]).createMailbox(bobPath)
+
+    // BOB delegates DAVID to access his account
+    server.getProbe(classOf[DelegationProbe]).addAuthorizedUser(BOB, DAVID)
+
+    Thread.sleep(100)
+
+    val response: Either[String, List[String]] =
+      authenticatedRequest(server)
+        .response(asWebSocket[Identity, List[String]] {
+          ws =>
+            ws.send(WebSocketFrame.text(
+              """{
+                |  "@type": "WebSocketPushEnable",
+                |  "dataTypes": ["EmailDelivery"]
+                |}""".stripMargin))
+
+            Thread.sleep(100)
+
+            // BOB has a new mail therefore EmailDelivery change
+            sendEmailTo(server, BOB)
+
+            List(
+              ws.receive()
+                .map { case t: Text =>
+                  t.payload
+                })
+        })
+        .send(backend)
+        .body
+
+    Thread.sleep(100)
+
+    // Bob should receive his EmailDelivery state change
+    assertThat(response.toOption.get.asJava)
+      .hasSize(1)
+
+    assertThatJson(response.toOption.get.asJava.get(0))
+      .isEqualTo(s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"EmailDelivery":"$${json-unit.ignore}"}},"pushState":"$${json-unit.ignore}"}""".stripMargin)
+  }
+
+  @Test
+  @Timeout(180)
+  def bobShouldReceiveHisChangesAndHisDelegatedAccountChanges(server: GuiceJamesServer): Unit = {
+    val davidPath = MailboxPath.inbox(DAVID)
+    server.getProbe(classOf[MailboxProbeImpl]).createMailbox(davidPath)
+
+    // DAVID delegates BOB to access his account
+    server.getProbe(classOf[DelegationProbe]).addAuthorizedUser(DAVID, BOB)
+
+    Thread.sleep(100)
+
+    val response: Either[String, List[String]] =
+      authenticatedRequest(server)
+        .response(asWebSocket[Identity, List[String]] {
+          ws =>
+            ws.send(WebSocketFrame.text(
+              """{
+                |  "@type": "WebSocketPushEnable",
+                |  "dataTypes": ["EmailDelivery"]
+                |}""".stripMargin))
+
+            Thread.sleep(100)
+
+            sendEmailTo(server, DAVID)
+            sendEmailTo(server, BOB)
+            sendEmailTo(server, DAVID)
+            sendEmailTo(server, BOB)
+
+            List(
+              ws.receive()
+                .map { case t: Text =>
+                  t.payload
+                },
+              ws.receive()
+                .map { case t: Text =>
+                  t.payload
+                },
+              ws.receive()
+                .map { case t: Text =>
+                  t.payload
+                },
+              ws.receive()
+                .map { case t: Text =>
+                  t.payload
+                })
+        })
+        .send(backend)
+        .body
+
+    Thread.sleep(100)
+
+    // Bob should receive DAVID's change and his changes
+    assertThat(response.toOption.get.asJava)
+      .hasSize(4)
+    assertThatJson(response.toOption.get.asJava.get(0))
+      .isEqualTo(s"""{"@type":"StateChange","changed":{"$DAVID_ACCOUNT_ID":{"EmailDelivery":"$${json-unit.ignore}"}},"pushState":"$${json-unit.ignore}"}""".stripMargin)
+    assertThatJson(response.toOption.get.asJava.get(1))
+      .isEqualTo(s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"EmailDelivery":"$${json-unit.ignore}"}},"pushState":"$${json-unit.ignore}"}""".stripMargin)
+    assertThatJson(response.toOption.get.asJava.get(2))
+      .isEqualTo(s"""{"@type":"StateChange","changed":{"$DAVID_ACCOUNT_ID":{"EmailDelivery":"$${json-unit.ignore}"}},"pushState":"$${json-unit.ignore}"}""".stripMargin)
+    assertThatJson(response.toOption.get.asJava.get(1))
+      .isEqualTo(s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"EmailDelivery":"$${json-unit.ignore}"}},"pushState":"$${json-unit.ignore}"}""".stripMargin)
+  }
+
   @Test
   @Timeout(180)
   def mixingPushAndResponsesShouldBeSupported(server: GuiceJamesServer): Unit = {
@@ -865,7 +1022,7 @@ trait WebSocketContract {
             Thread.sleep(100)
 
             // Andre send mail to Bob
-            sendEmailToBob(server)
+            sendEmailTo(server, BOB)
 
             List(
               ws.receive()
@@ -1224,11 +1381,11 @@ trait WebSocketContract {
       .header("Accept", ACCEPT_RFC8621_VERSION_HEADER)
   }
 
-  private def sendEmailToBob(server: GuiceJamesServer): Unit = {
+  private def sendEmailTo(server: GuiceJamesServer, recipient: Username): Unit = {
     val smtpMessageSender: SMTPMessageSender = new SMTPMessageSender(DOMAIN.asString())
     smtpMessageSender.connect("127.0.0.1", server.getProbe(classOf[SmtpGuiceProbe]).getSmtpPort)
       .authenticate(ANDRE.asString, ANDRE_PASSWORD)
-      .sendMessage(ANDRE.asString, BOB.asString())
+      .sendMessage(ANDRE.asString, recipient.asString())
     smtpMessageSender.close()
 
     awaitAtMostTenSeconds.until(() => server.getProbe(classOf[SpoolerProbe]).processingFinished())
diff --git a/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryEventSourceTest.java b/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryEventSourceTest.java
index 867c9cc5e1..8edd7c503f 100644
--- a/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryEventSourceTest.java
+++ b/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryEventSourceTest.java
@@ -26,6 +26,7 @@ import org.apache.james.JamesServerExtension;
 import org.apache.james.MemoryJamesConfiguration;
 import org.apache.james.MemoryJamesServerMain;
 import org.apache.james.jmap.rfc8621.contract.EventSourceContract;
+import org.apache.james.jmap.rfc8621.contract.probe.DelegationProbeModule;
 import org.apache.james.modules.TestJMAPServerModule;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
@@ -38,6 +39,6 @@ public class MemoryEventSourceTest implements EventSourceContract {
             .usersRepository(DEFAULT)
             .build())
         .server(configuration -> MemoryJamesServerMain.createServer(configuration)
-            .overrideWith(new TestJMAPServerModule()))
+            .overrideWith(new TestJMAPServerModule(), new DelegationProbeModule()))
         .build();
 }
diff --git a/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryWebSocketTest.java b/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryWebSocketTest.java
index e0027a1713..9cd2c2f863 100644
--- a/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryWebSocketTest.java
+++ b/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryWebSocketTest.java
@@ -26,6 +26,7 @@ import org.apache.james.JamesServerExtension;
 import org.apache.james.MemoryJamesConfiguration;
 import org.apache.james.MemoryJamesServerMain;
 import org.apache.james.jmap.rfc8621.contract.WebSocketContract;
+import org.apache.james.jmap.rfc8621.contract.probe.DelegationProbeModule;
 import org.apache.james.modules.TestJMAPServerModule;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
@@ -38,6 +39,6 @@ public class MemoryWebSocketTest implements WebSocketContract {
             .usersRepository(DEFAULT)
             .build())
         .server(configuration -> MemoryJamesServerMain.createServer(configuration)
-            .overrideWith(new TestJMAPServerModule()))
+            .overrideWith(new TestJMAPServerModule(), new DelegationProbeModule()))
         .build();
 }
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/pushsubscription/PushListener.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/pushsubscription/PushListener.scala
index b82638282c..c64a0913b6 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/pushsubscription/PushListener.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/pushsubscription/PushListener.scala
@@ -34,10 +34,11 @@ import org.apache.james.jmap.core.StateChange
 import org.apache.james.jmap.json.PushSerializer
 import org.apache.james.jmap.pushsubscription.PushListener.extractTopic
 import org.apache.james.jmap.pushsubscription.PushTopic.PushTopic
+import org.apache.james.user.api.DelegationStore
 import org.apache.james.util.ReactorUtils
 import org.reactivestreams.Publisher
 import play.api.libs.json.Json
-import reactor.core.scala.publisher.{SFlux, SMono}
+import reactor.core.scala.publisher.SMono
 
 case class PushListenerGroup() extends Group {}
 
@@ -58,15 +59,18 @@ object PushListener {
 }
 
 class PushListener @Inject()(pushRepository: PushSubscriptionRepository,
-                   webPushClient: WebPushClient,
-                   pushSerializer: PushSerializer) extends ReactiveGroupEventListener {
+                             webPushClient: WebPushClient,
+                             pushSerializer: PushSerializer,
+                             delegationStore: DelegationStore) extends ReactiveGroupEventListener {
 
   override def getDefaultGroup: Group = PushListenerGroup()
 
   override def reactiveEvent(event: Event): Publisher[Void] =
     event match {
       case event: StateChangeEvent =>
-        SFlux(pushRepository.list(event.username))
+        SMono.just(event.username)
+          .concatWith(delegationStore.authorizedUsers(event.username))
+          .flatMap(pushRepository.list)
           .filter(_.validated)
           .flatMap(sendNotification(_, event), ReactorUtils.DEFAULT_CONCURRENCY)
           .`then`()
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/EventSourceRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/EventSourceRoutes.scala
index 5e80d8d196..9bc4d1ebc3 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/EventSourceRoutes.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/EventSourceRoutes.scala
@@ -30,7 +30,7 @@ import eu.timepit.refined.refineV
 import io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE
 import io.netty.handler.codec.http.{HttpMethod, QueryStringDecoder}
 import javax.inject.{Inject, Named}
-import org.apache.james.events.{EventBus, Registration}
+import org.apache.james.events.{EventBus, Registration, RegistrationKey}
 import org.apache.james.jmap.HttpConstants.JSON_CONTENT_TYPE
 import org.apache.james.jmap.JMAPUrls.EVENT_SOURCE
 import org.apache.james.jmap.api.change.TypeStateFactory
@@ -44,6 +44,7 @@ import org.apache.james.jmap.json.{PushSerializer, ResponseSerializer}
 import org.apache.james.jmap.routes.PingPolicy.Interval
 import org.apache.james.jmap.{Endpoint, JMAPRoute, JMAPRoutes, InjectionKeys => JMAPInjectionKeys}
 import org.apache.james.mailbox.MailboxSession
+import org.apache.james.user.api.DelegationStore
 import play.api.libs.json.Json
 import reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST
 import reactor.core.publisher.{Mono, Sinks}
@@ -157,7 +158,8 @@ class EventSourceRoutes@Inject() (@Named(InjectionKeys.RFC_8621) val authenticat
                                   userProvisioner: UserProvisioning,
                                   @Named(JMAPInjectionKeys.JMAP) eventBus: EventBus,
                                   pushSerializer: PushSerializer,
-                                  typeStateFactory: TypeStateFactory) extends JMAPRoutes {
+                                  typeStateFactory: TypeStateFactory,
+                                  delegationStore: DelegationStore) extends JMAPRoutes {
 
   override def routes(): stream.Stream[JMAPRoute] = stream.Stream.of(
     JMAPRoute.builder
@@ -188,10 +190,11 @@ class EventSourceRoutes@Inject() (@Named(InjectionKeys.RFC_8621) val authenticat
       .asFlux()
       .subscribe(ping => context.outbound.emitNext(ping, FAIL_FAST))
 
-    SMono(
-      eventBus.register(
-        StateChangeListener(options.types, context.outbound),
-        AccountIdRegistrationKey.of(session.getUser)))
+    SMono.just(session.getUser)
+      .concatWith(SFlux.fromPublisher(delegationStore.delegatedUsers(session.getUser)))
+      .map(username => AccountIdRegistrationKey.of(username).asInstanceOf[RegistrationKey])
+      .collectSeq()
+      .flatMap(keys => SMono(eventBus.register(StateChangeListener(options.types, context.outbound), keys.asJavaCollection)))
       .doOnNext(newRegistration => context.withRegistration(newRegistration))
       .subscribeOn(Schedulers.boundedElastic())
       .subscribe()
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
index 8e232297cd..b4eaf4a27a 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
@@ -28,19 +28,20 @@ import io.netty.handler.codec.http.HttpMethod
 import io.netty.handler.codec.http.websocketx.WebSocketFrame
 import javax.inject.{Inject, Named}
 import org.apache.james.core.Username
-import org.apache.james.events.{EventBus, Registration}
+import org.apache.james.events.{EventBus, Registration, RegistrationKey}
 import org.apache.james.jmap.HttpConstants.JSON_CONTENT_TYPE
 import org.apache.james.jmap.JMAPUrls.JMAP_WS
 import org.apache.james.jmap.api.change.{EmailChangeRepository, MailboxChangeRepository, TypeStateFactory}
 import org.apache.james.jmap.api.model.{AccountId => JavaAccountId}
-import org.apache.james.jmap.change.{AccountIdRegistrationKey, StateChangeListener, _}
-import org.apache.james.jmap.core.{OutboundMessage, ProblemDetails, RequestId, WebSocketError, WebSocketPushDisable, WebSocketPushEnable, WebSocketRequest, WebSocketResponse, _}
+import org.apache.james.jmap.change._
+import org.apache.james.jmap.core._
 import org.apache.james.jmap.exceptions.UnauthorizedException
 import org.apache.james.jmap.http.rfc8621.InjectionKeys
 import org.apache.james.jmap.http.{Authenticator, UserProvisioning}
 import org.apache.james.jmap.json.{PushSerializer, ResponseSerializer}
 import org.apache.james.jmap.{Endpoint, JMAPRoute, JMAPRoutes, InjectionKeys => JMAPInjectionKeys}
 import org.apache.james.mailbox.MailboxSession
+import org.apache.james.user.api.DelegationStore
 import org.slf4j.{Logger, LoggerFactory}
 import play.api.libs.json.Json
 import reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST
@@ -50,6 +51,8 @@ import reactor.core.scheduler.Schedulers
 import reactor.netty.http.server.{HttpServerRequest, HttpServerResponse}
 import reactor.netty.http.websocket.{WebsocketInbound, WebsocketOutbound}
 
+import scala.jdk.CollectionConverters._
+
 object WebSocketRoutes {
   val LOGGER: Logger = LoggerFactory.getLogger(classOf[WebSocketRoutes])
 }
@@ -75,7 +78,8 @@ class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticato
                                  mailboxChangeRepository: MailboxChangeRepository,
                                  emailChangeRepository: EmailChangeRepository,
                                  pushSerializer: PushSerializer,
-                                 typeStateFactory: TypeStateFactory) extends JMAPRoutes {
+                                 typeStateFactory: TypeStateFactory,
+                                 delegationStore: DelegationStore) extends JMAPRoutes {
 
   override def routes(): stream.Stream[JMAPRoute] = stream.Stream.of(
     JMAPRoute.builder
@@ -130,9 +134,13 @@ class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticato
               .map[OutboundMessage](WebSocketResponse(request.id, _))
               .onErrorResume(e => SMono.just(asError(request.id)(e)))
           case pushEnable: WebSocketPushEnable =>
-            SMono(eventBus.register(
+            SMono.just(clientContext.session.getUser)
+              .concatWith(SFlux.fromPublisher(delegationStore.delegatedUsers(clientContext.session.getUser)))
+              .map(username => AccountIdRegistrationKey.of(username).asInstanceOf[RegistrationKey])
+              .collectSeq()
+              .flatMap(keys => SMono(eventBus.register(
                 StateChangeListener(pushEnable.dataTypes.getOrElse(typeStateFactory.all.toSet), clientContext.outbound),
-                AccountIdRegistrationKey.of(clientContext.session.getUser)))
+                keys.asJavaCollection)))
               .doOnNext(newRegistration => clientContext.withRegistration(newRegistration))
               .`then`(sendPushStateIfRequested(pushEnable, clientContext))
           case WebSocketPushDisable => SMono.fromCallable(() => clientContext.clean())
diff --git a/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/pushsubscription/PushListenerTest.scala b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/pushsubscription/PushListenerTest.scala
index 5ee1bf0b04..bae016a02c 100644
--- a/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/pushsubscription/PushListenerTest.scala
+++ b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/pushsubscription/PushListenerTest.scala
@@ -38,11 +38,13 @@ import org.apache.james.jmap.change.{EmailDeliveryTypeName, EmailTypeName, Mailb
 import org.apache.james.jmap.core.{AccountId, PushState, StateChange, UuidState}
 import org.apache.james.jmap.json.PushSerializer
 import org.apache.james.jmap.memory.pushsubscription.MemoryPushSubscriptionRepository
+import org.apache.james.user.api.DelegationStore
+import org.apache.james.user.memory.MemoryDelegationStore
 import org.assertj.core.api.Assertions.assertThat
 import org.assertj.core.api.SoftAssertions
 import org.junit.jupiter.api.{BeforeEach, Nested, Test}
 import org.mockito.ArgumentMatchers.any
-import org.mockito.Mockito.{mock, verify, verifyNoInteractions, when}
+import org.mockito.Mockito.{mock, times, verify, verifyNoInteractions, when}
 import org.mockito.{ArgumentCaptor, ArgumentMatchers}
 import reactor.core.scala.publisher.SMono
 
@@ -52,11 +54,13 @@ class PushListenerTest {
   val bob: Username = Username.of("bob@localhost")
   val alice: Username = Username.of("alice@localhost")
   val bobAccountId: String = "405010d6c16c16dec36f3d7a7596c2d757ba7b57904adc4801a63e40914fd5c9"
+  val aliceAccountId: String = "93c56f4408cff66f0a929aea8e3940e753c3275e5622582ae3010e7277b7696c"
   val url: PushSubscriptionServerURL = PushSubscriptionServerURL.from("http://localhost:9999/push").get
 
   var testee: PushListener = _
   var pushSubscriptionRepository: PushSubscriptionRepository = _
   var webPushClient: WebPushClient = _
+  var delegationStore: DelegationStore = _
 
   @BeforeEach
   def setUp(): Unit = {
@@ -64,7 +68,8 @@ class PushListenerTest {
 
     pushSubscriptionRepository = new MemoryPushSubscriptionRepository(Clock.systemUTC())
     webPushClient = mock(classOf[WebPushClient])
-    testee = new PushListener(pushSubscriptionRepository, webPushClient, pushSerializer)
+    delegationStore = new MemoryDelegationStore()
+    testee = new PushListener(pushSubscriptionRepository, webPushClient, pushSerializer, delegationStore)
 
     when(webPushClient.push(any(), any())).thenReturn(SMono.empty[Unit])
   }
@@ -126,6 +131,73 @@ class PushListenerTest {
     })
   }
 
+  @Test
+  def shouldNotPushAliceChangesToBobWhenBobIsNotDelegatedByAlice(): Unit = {
+    val bobSubscriptionId = SMono(pushSubscriptionRepository.save(bob, PushSubscriptionCreationRequest(
+      deviceClientId = DeviceClientId("junit1"),
+      url = url,
+      types = Seq(EmailTypeName, MailboxTypeName)))).block().id
+    SMono(pushSubscriptionRepository.validateVerificationCode(bob, bobSubscriptionId)).block()
+
+    val state1 = UuidState(UUID.randomUUID())
+    SMono(testee.reactiveEvent(StateChangeEvent(EventId.random(), alice, Map(EmailTypeName -> state1)))).block()
+
+    verify(webPushClient, times(0)).push(ArgumentMatchers.eq(url), any())
+  }
+
+  @Test
+  def shouldPushAliceChangesToAliceAndBobWhenBobIsDelegatedByAlice(): Unit = {
+    SMono.fromPublisher(delegationStore.addAuthorizedUser(alice, bob)).block()
+
+    val bobSubscriptionId = SMono(pushSubscriptionRepository.save(bob, PushSubscriptionCreationRequest(
+      deviceClientId = DeviceClientId("junit1"),
+      url = url,
+      types = Seq(EmailTypeName, MailboxTypeName)))).block().id
+    SMono(pushSubscriptionRepository.validateVerificationCode(bob, bobSubscriptionId)).block()
+    val aliceSubscriptionId = SMono(pushSubscriptionRepository.save(alice, PushSubscriptionCreationRequest(
+      deviceClientId = DeviceClientId("junit2"),
+      url = url,
+      types = Seq(EmailTypeName, MailboxTypeName)))).block().id
+    SMono(pushSubscriptionRepository.validateVerificationCode(alice, aliceSubscriptionId)).block()
+
+    val state1 = UuidState(UUID.randomUUID())
+    SMono(testee.reactiveEvent(StateChangeEvent(EventId.random(), alice, Map(EmailTypeName -> state1)))).block()
+
+    val argumentCaptor: ArgumentCaptor[PushRequest] = ArgumentCaptor.forClass(classOf[PushRequest])
+    verify(webPushClient, times(2)).push(ArgumentMatchers.eq(url), argumentCaptor.capture())
+    SoftAssertions.assertSoftly(softly => {
+      softly.assertThat(new String(argumentCaptor.getAllValues.get(0).payload, StandardCharsets.UTF_8))
+        .isEqualTo(s"""{"@type":"StateChange","changed":{"$aliceAccountId":{"Email":"${state1.value.toString}"}}}""")
+      softly.assertThat(new String(argumentCaptor.getAllValues.get(1).payload, StandardCharsets.UTF_8))
+        .isEqualTo(s"""{"@type":"StateChange","changed":{"$aliceAccountId":{"Email":"${state1.value.toString}"}}}""")
+    })
+  }
+
+  @Test
+  def bobShouldReceiveHisChangesAndAliceChangesWhenBobIsDelegatedByAlice(): Unit = {
+    SMono.fromPublisher(delegationStore.addAuthorizedUser(alice, bob)).block()
+
+    val bobSubscriptionId = SMono(pushSubscriptionRepository.save(bob, PushSubscriptionCreationRequest(
+      deviceClientId = DeviceClientId("junit1"),
+      url = url,
+      types = Seq(EmailTypeName, MailboxTypeName)))).block().id
+    SMono(pushSubscriptionRepository.validateVerificationCode(bob, bobSubscriptionId)).block()
+
+    val stateChangeBob = UuidState(UUID.randomUUID())
+    SMono(testee.reactiveEvent(StateChangeEvent(EventId.random(), bob, Map(EmailTypeName -> stateChangeBob)))).block()
+    val stateChangeAlice = UuidState(UUID.randomUUID())
+    SMono(testee.reactiveEvent(StateChangeEvent(EventId.random(), alice, Map(EmailTypeName -> stateChangeAlice)))).block()
+
+    val argumentCaptor: ArgumentCaptor[PushRequest] = ArgumentCaptor.forClass(classOf[PushRequest])
+    verify(webPushClient, times(2)).push(ArgumentMatchers.eq(url), argumentCaptor.capture())
+    SoftAssertions.assertSoftly(softly => {
+      softly.assertThat(new String(argumentCaptor.getAllValues.get(0).payload, StandardCharsets.UTF_8))
+        .isEqualTo(s"""{"@type":"StateChange","changed":{"$bobAccountId":{"Email":"${stateChangeBob.value.toString}"}}}""")
+      softly.assertThat(new String(argumentCaptor.getAllValues.get(1).payload, StandardCharsets.UTF_8))
+        .isEqualTo(s"""{"@type":"StateChange","changed":{"$aliceAccountId":{"Email":"${stateChangeAlice.value.toString}"}}}""")
+    })
+  }
+
   @Test
   def unwantedTypesShouldBeFilteredOut(): Unit = {
     val id = SMono(pushSubscriptionRepository.save(bob, PushSubscriptionCreationRequest(


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org