You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2023/01/16 09:13:10 UTC
[james-project] branch master updated: JAMES-3756 JMAP pushes should support delegation (#1386)
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 0b77dfe382 JAMES-3756 JMAP pushes should support delegation (#1386)
0b77dfe382 is described below
commit 0b77dfe382217c600f73ed2654897fb2ae580402
Author: Trần Hồng Quân <55...@users.noreply.github.com>
AuthorDate: Mon Jan 16 16:13:05 2023 +0700
JAMES-3756 JMAP pushes should support delegation (#1386)
---
.../java/org/apache/james/events/EventBus.java | 9 ++
.../distributed/DistributedWebSocketTest.java | 3 +-
.../rfc8621/contract/EventSourceContract.scala | 146 +++++++++++++++++-
.../jmap/rfc8621/contract/WebSocketContract.scala | 163 ++++++++++++++++++++-
.../jmap/rfc8621/memory/MemoryEventSourceTest.java | 3 +-
.../jmap/rfc8621/memory/MemoryWebSocketTest.java | 3 +-
.../james/jmap/pushsubscription/PushListener.scala | 12 +-
.../james/jmap/routes/EventSourceRoutes.scala | 15 +-
.../apache/james/jmap/routes/WebSocketRoutes.scala | 20 ++-
.../jmap/pushsubscription/PushListenerTest.scala | 76 +++++++++-
10 files changed, 422 insertions(+), 28 deletions(-)
diff --git a/event-bus/api/src/main/java/org/apache/james/events/EventBus.java b/event-bus/api/src/main/java/org/apache/james/events/EventBus.java
index 11aec038cf..418af9dd77 100644
--- a/event-bus/api/src/main/java/org/apache/james/events/EventBus.java
+++ b/event-bus/api/src/main/java/org/apache/james/events/EventBus.java
@@ -27,6 +27,7 @@ import org.reactivestreams.Publisher;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public interface EventBus {
@@ -54,6 +55,14 @@ public interface EventBus {
Publisher<Registration> register(EventListener.ReactiveEventListener listener, RegistrationKey key);
+ default Publisher<Registration> register(EventListener.ReactiveEventListener listener, Collection<RegistrationKey> keys) {
+ return Flux.fromIterable(keys)
+ .concatMap(key -> register(listener, key))
+ .reduce((reg1, reg2) -> () -> Flux.merge(reg1.unregister(), reg2.unregister()))
+ .map(unRegistrationWithMergedFlux -> () -> Mono.from(Flux.from(unRegistrationWithMergedFlux.unregister())
+ .then()));
+ }
+
Registration register(EventListener.ReactiveEventListener listener, Group group) throws GroupAlreadyRegistered;
default Registration register(EventListener listener, Group group) throws GroupAlreadyRegistered {
diff --git a/server/protocols/jmap-rfc-8621-integration-tests/distributed-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/distributed/DistributedWebSocketTest.java b/server/protocols/jmap-rfc-8621-integration-tests/distributed-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/distributed/DistributedWebSocketTest.java
index 6772b1516d..83cc84e857 100644
--- a/server/protocols/jmap-rfc-8621-integration-tests/distributed-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/distributed/DistributedWebSocketTest.java
+++ b/server/protocols/jmap-rfc-8621-integration-tests/distributed-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/distributed/DistributedWebSocketTest.java
@@ -26,6 +26,7 @@ import org.apache.james.DockerOpenSearchExtension;
import org.apache.james.JamesServerBuilder;
import org.apache.james.JamesServerExtension;
import org.apache.james.jmap.rfc8621.contract.WebSocketContract;
+import org.apache.james.jmap.rfc8621.contract.probe.DelegationProbeModule;
import org.apache.james.modules.AwsS3BlobStoreExtension;
import org.apache.james.modules.RabbitMQExtension;
import org.apache.james.modules.TestJMAPServerModule;
@@ -49,6 +50,6 @@ public class DistributedWebSocketTest implements WebSocketContract {
.extension(new RabbitMQExtension())
.extension(new AwsS3BlobStoreExtension())
.server(configuration -> CassandraRabbitMQJamesServerMain.createServer(configuration)
- .overrideWith(new TestJMAPServerModule()))
+ .overrideWith(new TestJMAPServerModule(), new DelegationProbeModule()))
.build();
}
diff --git a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/EventSourceContract.scala b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/EventSourceContract.scala
index 1d24f53c5c..a31f740cc2 100644
--- a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/EventSourceContract.scala
+++ b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/EventSourceContract.scala
@@ -27,9 +27,11 @@ import io.netty.handler.codec.http.HttpResponseStatus
import io.restassured.RestAssured.{`given`, requestSpecification}
import org.apache.http.HttpStatus.SC_OK
import org.apache.james.GuiceJamesServer
+import org.apache.james.core.Username
import org.apache.james.jmap.draft.JmapGuiceProbe
import org.apache.james.jmap.http.UserCredential
-import org.apache.james.jmap.rfc8621.contract.Fixture.{ACCEPT_RFC8621_VERSION_HEADER, ACCOUNT_ID, ANDRE, ANDRE_PASSWORD, BOB, BOB_PASSWORD, DOMAIN, authScheme, baseRequestSpecBuilder}
+import org.apache.james.jmap.rfc8621.contract.Fixture.{ACCEPT_RFC8621_VERSION_HEADER, ACCOUNT_ID, ANDRE, ANDRE_PASSWORD, BOB, BOB_PASSWORD, DAVID, DAVID_ACCOUNT_ID, DOMAIN, authScheme, baseRequestSpecBuilder}
+import org.apache.james.jmap.rfc8621.contract.probe.DelegationProbe
import org.apache.james.mailbox.DefaultMailboxes
import org.apache.james.mailbox.MessageManager.AppendCommand
import org.apache.james.mailbox.model.{MailboxPath, MessageId}
@@ -38,6 +40,7 @@ import org.apache.james.modules.MailboxProbeImpl
import org.apache.james.modules.protocols.SmtpGuiceProbe
import org.apache.james.utils.{DataProbeImpl, SMTPMessageSender, SpoolerProbe}
import org.assertj.core.api.Assertions.assertThat
+import org.assertj.core.api.SoftAssertions
import org.awaitility.Awaitility
import org.awaitility.Durations.ONE_HUNDRED_MILLISECONDS
import org.awaitility.core.ConditionFactory
@@ -62,6 +65,7 @@ trait EventSourceContract {
.addDomain(DOMAIN.asString())
.addUser(ANDRE.asString(), ANDRE_PASSWORD)
.addUser(BOB.asString(), BOB_PASSWORD)
+ .addUser(DAVID.asString(), "secret")
requestSpecification = baseRequestSpecBuilder(server)
.setAuth(authScheme(UserCredential(BOB, BOB_PASSWORD)))
@@ -307,7 +311,7 @@ trait EventSourceContract {
// Bob receives a mail
Thread.sleep(500)
- sendEmailToBob(server)
+ sendEmailTo(server, BOB)
awaitAtMostTenSeconds.untilAsserted(() => {
assertThat(seq.asJava)
@@ -558,11 +562,145 @@ trait EventSourceContract {
.hasSize(2)
}
- private def sendEmailToBob(server: GuiceJamesServer): Unit = {
+ @Test
+ def shouldPushChangesToDelegatedUser(server: GuiceJamesServer): Unit = {
+ val port = server.getProbe(classOf[JmapGuiceProbe]).getJmapPort.getValue
+ val davidPath = MailboxPath.inbox(DAVID)
+ server.getProbe(classOf[MailboxProbeImpl]).createMailbox(davidPath)
+
+ // DAVID delegates BOB to access his account
+ server.getProbe(classOf[DelegationProbe]).addAuthorizedUser(DAVID, BOB)
+
+ val seq = new ListBuffer[String]()
+ HttpClient.create
+ .baseUrl(s"http://127.0.0.1:$port/eventSource?types=EmailDelivery&ping=0&closeAfter=no")
+ .headers(builder => {
+ builder.add("Authorization", "Basic Ym9iQGRvbWFpbi50bGQ6Ym9icGFzc3dvcmQ=")
+ builder.add("Accept", ACCEPT_RFC8621_VERSION_HEADER)
+ })
+ .get()
+ .responseContent()
+ .map(bb => {
+ val bytes = new Array[Byte](bb.readableBytes)
+ bb.readBytes(bytes)
+ new String(bytes, StandardCharsets.UTF_8)
+ })
+ .doOnNext(seq.addOne)
+ .subscribeOn(Schedulers.boundedElastic())
+ .subscribe()
+
+ Thread.sleep(500)
+ // DAVID has a new mail therefore EmailDelivery change
+ sendEmailTo(server, DAVID)
+
+ // Bob should receive DAVID's EmailDelivery state change
+ awaitAtMostTenSeconds.untilAsserted(() => {
+ SoftAssertions.assertSoftly(softly => {
+ softly.assertThat(seq.asJava)
+ .hasSize(1)
+ softly.assertThat(seq.head)
+ .contains("EmailDelivery", DAVID_ACCOUNT_ID)
+ })
+ })
+ }
+
+ @Test
+ def ownerUserShouldStillReceiveHisChangesWhenHeDelegatesHisAccountToOtherUsers(server: GuiceJamesServer): Unit = {
+ val port = server.getProbe(classOf[JmapGuiceProbe]).getJmapPort.getValue
+ val bobPath = MailboxPath.inbox(BOB)
+ server.getProbe(classOf[MailboxProbeImpl]).createMailbox(bobPath)
+
+ // BOB delegates DAVID to access his account
+ server.getProbe(classOf[DelegationProbe]).addAuthorizedUser(BOB, DAVID)
+
+ val seq = new ListBuffer[String]()
+ HttpClient.create
+ .baseUrl(s"http://127.0.0.1:$port/eventSource?types=EmailDelivery&ping=0&closeAfter=no")
+ .headers(builder => {
+ builder.add("Authorization", "Basic Ym9iQGRvbWFpbi50bGQ6Ym9icGFzc3dvcmQ=")
+ builder.add("Accept", ACCEPT_RFC8621_VERSION_HEADER)
+ })
+ .get()
+ .responseContent()
+ .map(bb => {
+ val bytes = new Array[Byte](bb.readableBytes)
+ bb.readBytes(bytes)
+ new String(bytes, StandardCharsets.UTF_8)
+ })
+ .doOnNext(seq.addOne)
+ .subscribeOn(Schedulers.boundedElastic())
+ .subscribe()
+
+ Thread.sleep(500)
+ // BOB has a new mail therefore EmailDelivery change
+ sendEmailTo(server, BOB)
+
+ // Bob should receive his EmailDelivery state change
+ awaitAtMostTenSeconds.untilAsserted(() => {
+ SoftAssertions.assertSoftly(softly => {
+ softly.assertThat(seq.asJava)
+ .hasSize(1)
+ softly.assertThat(seq.head)
+ .contains("EmailDelivery", ACCOUNT_ID)
+ })
+ })
+ }
+
+ @Test
+ def bobShouldReceiveHisChangesAndHisDelegatedAccountChanges(server: GuiceJamesServer): Unit = {
+ val port = server.getProbe(classOf[JmapGuiceProbe]).getJmapPort.getValue
+ val davidPath = MailboxPath.inbox(DAVID)
+ server.getProbe(classOf[MailboxProbeImpl]).createMailbox(davidPath)
+
+ // DAVID delegates BOB to access his account
+ server.getProbe(classOf[DelegationProbe]).addAuthorizedUser(DAVID, BOB)
+
+ val seq = new ListBuffer[String]()
+ HttpClient.create
+ .baseUrl(s"http://127.0.0.1:$port/eventSource?types=EmailDelivery&ping=0&closeAfter=no")
+ .headers(builder => {
+ builder.add("Authorization", "Basic Ym9iQGRvbWFpbi50bGQ6Ym9icGFzc3dvcmQ=")
+ builder.add("Accept", ACCEPT_RFC8621_VERSION_HEADER)
+ })
+ .get()
+ .responseContent()
+ .map(bb => {
+ val bytes = new Array[Byte](bb.readableBytes)
+ bb.readBytes(bytes)
+ new String(bytes, StandardCharsets.UTF_8)
+ })
+ .doOnNext(seq.addOne)
+ .subscribeOn(Schedulers.boundedElastic())
+ .subscribe()
+
+ Thread.sleep(500)
+ sendEmailTo(server, DAVID)
+ sendEmailTo(server, BOB)
+ sendEmailTo(server, DAVID)
+ sendEmailTo(server, BOB)
+
+ // Bob should receive David's change and his changes
+ awaitAtMostTenSeconds.untilAsserted(() => {
+ SoftAssertions.assertSoftly(softly => {
+ softly.assertThat(seq.asJava)
+ .hasSize(4)
+ softly.assertThat(seq.apply(0))
+ .contains("EmailDelivery", DAVID_ACCOUNT_ID)
+ softly.assertThat(seq.apply(1))
+ .contains("EmailDelivery", ACCOUNT_ID)
+ softly.assertThat(seq.apply(2))
+ .contains("EmailDelivery", DAVID_ACCOUNT_ID)
+ softly.assertThat(seq.apply(3))
+ .contains("EmailDelivery", ACCOUNT_ID)
+ })
+ })
+ }
+
+ private def sendEmailTo(server: GuiceJamesServer, recipient: Username): Unit = {
val smtpMessageSender: SMTPMessageSender = new SMTPMessageSender(DOMAIN.asString())
smtpMessageSender.connect("127.0.0.1", server.getProbe(classOf[SmtpGuiceProbe]).getSmtpPort)
.authenticate(ANDRE.asString, ANDRE_PASSWORD)
- .sendMessage(ANDRE.asString, BOB.asString())
+ .sendMessage(ANDRE.asString, recipient.asString())
smtpMessageSender.close()
awaitAtMostTenSeconds.until(() => server.getProbe(classOf[SpoolerProbe]).processingFinished())
diff --git a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketContract.scala b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketContract.scala
index dacb09c047..cd5ea01246 100644
--- a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketContract.scala
+++ b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketContract.scala
@@ -24,11 +24,13 @@ import java.util.concurrent.TimeUnit
import net.javacrumbs.jsonunit.assertj.JsonAssertions.assertThatJson
import org.apache.james.GuiceJamesServer
+import org.apache.james.core.Username
import org.apache.james.jmap.api.change.State
import org.apache.james.jmap.api.model.AccountId
import org.apache.james.jmap.core.{PushState, UuidState}
import org.apache.james.jmap.draft.JmapGuiceProbe
import org.apache.james.jmap.rfc8621.contract.Fixture._
+import org.apache.james.jmap.rfc8621.contract.probe.DelegationProbe
import org.apache.james.mailbox.MessageManager.AppendCommand
import org.apache.james.mailbox.model.MailboxACL.Right
import org.apache.james.mailbox.model.{MailboxACL, MailboxPath}
@@ -73,6 +75,7 @@ trait WebSocketContract {
.addDomain(DOMAIN.asString())
.addUser(ANDRE.asString(), ANDRE_PASSWORD)
.addUser(BOB.asString(), BOB_PASSWORD)
+ .addUser(DAVID.asString(), "secret")
}
@Test
@@ -486,6 +489,160 @@ trait WebSocketContract {
.contains(stateChange)
}
+ @Test
+ @Timeout(180)
+ def shouldPushChangesToDelegatedUser(server: GuiceJamesServer): Unit = {
+ val davidPath = MailboxPath.inbox(DAVID)
+ server.getProbe(classOf[MailboxProbeImpl]).createMailbox(davidPath)
+
+ // DAVID delegates BOB to access his account
+ server.getProbe(classOf[DelegationProbe]).addAuthorizedUser(DAVID, BOB)
+
+ Thread.sleep(100)
+
+ val response: Either[String, List[String]] =
+ authenticatedRequest(server)
+ .response(asWebSocket[Identity, List[String]] {
+ ws =>
+ ws.send(WebSocketFrame.text(
+ """{
+ | "@type": "WebSocketPushEnable",
+ | "dataTypes": ["EmailDelivery"]
+ |}""".stripMargin))
+
+ Thread.sleep(100)
+
+ // DAVID has a new mail therefore EmailDelivery change
+ sendEmailTo(server, DAVID)
+
+ List(
+ ws.receive()
+ .map { case t: Text =>
+ t.payload
+ })
+ })
+ .send(backend)
+ .body
+
+ Thread.sleep(100)
+
+ // Bob should receive DAVID's EmailDelivery state change
+ assertThat(response.toOption.get.asJava)
+ .hasSize(1)
+
+ assertThatJson(response.toOption.get.asJava.get(0))
+ .isEqualTo(s"""{"@type":"StateChange","changed":{"$DAVID_ACCOUNT_ID":{"EmailDelivery":"$${json-unit.ignore}"}},"pushState":"$${json-unit.ignore}"}""".stripMargin)
+ }
+
+ @Test
+ @Timeout(180)
+ def ownerUserShouldStillReceiveHisChangesWhenHeDelegatesHisAccountToOtherUsers(server: GuiceJamesServer): Unit = {
+ val bobPath = MailboxPath.inbox(BOB)
+ server.getProbe(classOf[MailboxProbeImpl]).createMailbox(bobPath)
+
+ // BOB delegates DAVID to access his account
+ server.getProbe(classOf[DelegationProbe]).addAuthorizedUser(BOB, DAVID)
+
+ Thread.sleep(100)
+
+ val response: Either[String, List[String]] =
+ authenticatedRequest(server)
+ .response(asWebSocket[Identity, List[String]] {
+ ws =>
+ ws.send(WebSocketFrame.text(
+ """{
+ | "@type": "WebSocketPushEnable",
+ | "dataTypes": ["EmailDelivery"]
+ |}""".stripMargin))
+
+ Thread.sleep(100)
+
+ // BOB has a new mail therefore EmailDelivery change
+ sendEmailTo(server, BOB)
+
+ List(
+ ws.receive()
+ .map { case t: Text =>
+ t.payload
+ })
+ })
+ .send(backend)
+ .body
+
+ Thread.sleep(100)
+
+ // Bob should receive his EmailDelivery state change
+ assertThat(response.toOption.get.asJava)
+ .hasSize(1)
+
+ assertThatJson(response.toOption.get.asJava.get(0))
+ .isEqualTo(s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"EmailDelivery":"$${json-unit.ignore}"}},"pushState":"$${json-unit.ignore}"}""".stripMargin)
+ }
+
+ @Test
+ @Timeout(180)
+ def bobShouldReceiveHisChangesAndHisDelegatedAccountChanges(server: GuiceJamesServer): Unit = {
+ val davidPath = MailboxPath.inbox(DAVID)
+ server.getProbe(classOf[MailboxProbeImpl]).createMailbox(davidPath)
+
+ // DAVID delegates BOB to access his account
+ server.getProbe(classOf[DelegationProbe]).addAuthorizedUser(DAVID, BOB)
+
+ Thread.sleep(100)
+
+ val response: Either[String, List[String]] =
+ authenticatedRequest(server)
+ .response(asWebSocket[Identity, List[String]] {
+ ws =>
+ ws.send(WebSocketFrame.text(
+ """{
+ | "@type": "WebSocketPushEnable",
+ | "dataTypes": ["EmailDelivery"]
+ |}""".stripMargin))
+
+ Thread.sleep(100)
+
+ sendEmailTo(server, DAVID)
+ sendEmailTo(server, BOB)
+ sendEmailTo(server, DAVID)
+ sendEmailTo(server, BOB)
+
+ List(
+ ws.receive()
+ .map { case t: Text =>
+ t.payload
+ },
+ ws.receive()
+ .map { case t: Text =>
+ t.payload
+ },
+ ws.receive()
+ .map { case t: Text =>
+ t.payload
+ },
+ ws.receive()
+ .map { case t: Text =>
+ t.payload
+ })
+ })
+ .send(backend)
+ .body
+
+ Thread.sleep(100)
+
+ // Bob should receive DAVID's change and his changes
+ assertThat(response.toOption.get.asJava)
+ .hasSize(4)
+ assertThatJson(response.toOption.get.asJava.get(0))
+ .isEqualTo(s"""{"@type":"StateChange","changed":{"$DAVID_ACCOUNT_ID":{"EmailDelivery":"$${json-unit.ignore}"}},"pushState":"$${json-unit.ignore}"}""".stripMargin)
+ assertThatJson(response.toOption.get.asJava.get(1))
+ .isEqualTo(s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"EmailDelivery":"$${json-unit.ignore}"}},"pushState":"$${json-unit.ignore}"}""".stripMargin)
+ assertThatJson(response.toOption.get.asJava.get(2))
+ .isEqualTo(s"""{"@type":"StateChange","changed":{"$DAVID_ACCOUNT_ID":{"EmailDelivery":"$${json-unit.ignore}"}},"pushState":"$${json-unit.ignore}"}""".stripMargin)
+ assertThatJson(response.toOption.get.asJava.get(1))
+ .isEqualTo(s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"EmailDelivery":"$${json-unit.ignore}"}},"pushState":"$${json-unit.ignore}"}""".stripMargin)
+ }
+
@Test
@Timeout(180)
def mixingPushAndResponsesShouldBeSupported(server: GuiceJamesServer): Unit = {
@@ -865,7 +1022,7 @@ trait WebSocketContract {
Thread.sleep(100)
// Andre send mail to Bob
- sendEmailToBob(server)
+ sendEmailTo(server, BOB)
List(
ws.receive()
@@ -1224,11 +1381,11 @@ trait WebSocketContract {
.header("Accept", ACCEPT_RFC8621_VERSION_HEADER)
}
- private def sendEmailToBob(server: GuiceJamesServer): Unit = {
+ private def sendEmailTo(server: GuiceJamesServer, recipient: Username): Unit = {
val smtpMessageSender: SMTPMessageSender = new SMTPMessageSender(DOMAIN.asString())
smtpMessageSender.connect("127.0.0.1", server.getProbe(classOf[SmtpGuiceProbe]).getSmtpPort)
.authenticate(ANDRE.asString, ANDRE_PASSWORD)
- .sendMessage(ANDRE.asString, BOB.asString())
+ .sendMessage(ANDRE.asString, recipient.asString())
smtpMessageSender.close()
awaitAtMostTenSeconds.until(() => server.getProbe(classOf[SpoolerProbe]).processingFinished())
diff --git a/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryEventSourceTest.java b/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryEventSourceTest.java
index 867c9cc5e1..8edd7c503f 100644
--- a/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryEventSourceTest.java
+++ b/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryEventSourceTest.java
@@ -26,6 +26,7 @@ import org.apache.james.JamesServerExtension;
import org.apache.james.MemoryJamesConfiguration;
import org.apache.james.MemoryJamesServerMain;
import org.apache.james.jmap.rfc8621.contract.EventSourceContract;
+import org.apache.james.jmap.rfc8621.contract.probe.DelegationProbeModule;
import org.apache.james.modules.TestJMAPServerModule;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -38,6 +39,6 @@ public class MemoryEventSourceTest implements EventSourceContract {
.usersRepository(DEFAULT)
.build())
.server(configuration -> MemoryJamesServerMain.createServer(configuration)
- .overrideWith(new TestJMAPServerModule()))
+ .overrideWith(new TestJMAPServerModule(), new DelegationProbeModule()))
.build();
}
diff --git a/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryWebSocketTest.java b/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryWebSocketTest.java
index e0027a1713..9cd2c2f863 100644
--- a/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryWebSocketTest.java
+++ b/server/protocols/jmap-rfc-8621-integration-tests/memory-jmap-rfc-8621-integration-tests/src/test/java/org/apache/james/jmap/rfc8621/memory/MemoryWebSocketTest.java
@@ -26,6 +26,7 @@ import org.apache.james.JamesServerExtension;
import org.apache.james.MemoryJamesConfiguration;
import org.apache.james.MemoryJamesServerMain;
import org.apache.james.jmap.rfc8621.contract.WebSocketContract;
+import org.apache.james.jmap.rfc8621.contract.probe.DelegationProbeModule;
import org.apache.james.modules.TestJMAPServerModule;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -38,6 +39,6 @@ public class MemoryWebSocketTest implements WebSocketContract {
.usersRepository(DEFAULT)
.build())
.server(configuration -> MemoryJamesServerMain.createServer(configuration)
- .overrideWith(new TestJMAPServerModule()))
+ .overrideWith(new TestJMAPServerModule(), new DelegationProbeModule()))
.build();
}
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/pushsubscription/PushListener.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/pushsubscription/PushListener.scala
index b82638282c..c64a0913b6 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/pushsubscription/PushListener.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/pushsubscription/PushListener.scala
@@ -34,10 +34,11 @@ import org.apache.james.jmap.core.StateChange
import org.apache.james.jmap.json.PushSerializer
import org.apache.james.jmap.pushsubscription.PushListener.extractTopic
import org.apache.james.jmap.pushsubscription.PushTopic.PushTopic
+import org.apache.james.user.api.DelegationStore
import org.apache.james.util.ReactorUtils
import org.reactivestreams.Publisher
import play.api.libs.json.Json
-import reactor.core.scala.publisher.{SFlux, SMono}
+import reactor.core.scala.publisher.SMono
case class PushListenerGroup() extends Group {}
@@ -58,15 +59,18 @@ object PushListener {
}
class PushListener @Inject()(pushRepository: PushSubscriptionRepository,
- webPushClient: WebPushClient,
- pushSerializer: PushSerializer) extends ReactiveGroupEventListener {
+ webPushClient: WebPushClient,
+ pushSerializer: PushSerializer,
+ delegationStore: DelegationStore) extends ReactiveGroupEventListener {
override def getDefaultGroup: Group = PushListenerGroup()
override def reactiveEvent(event: Event): Publisher[Void] =
event match {
case event: StateChangeEvent =>
- SFlux(pushRepository.list(event.username))
+ SMono.just(event.username)
+ .concatWith(delegationStore.authorizedUsers(event.username))
+ .flatMap(pushRepository.list)
.filter(_.validated)
.flatMap(sendNotification(_, event), ReactorUtils.DEFAULT_CONCURRENCY)
.`then`()
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/EventSourceRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/EventSourceRoutes.scala
index 5e80d8d196..9bc4d1ebc3 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/EventSourceRoutes.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/EventSourceRoutes.scala
@@ -30,7 +30,7 @@ import eu.timepit.refined.refineV
import io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE
import io.netty.handler.codec.http.{HttpMethod, QueryStringDecoder}
import javax.inject.{Inject, Named}
-import org.apache.james.events.{EventBus, Registration}
+import org.apache.james.events.{EventBus, Registration, RegistrationKey}
import org.apache.james.jmap.HttpConstants.JSON_CONTENT_TYPE
import org.apache.james.jmap.JMAPUrls.EVENT_SOURCE
import org.apache.james.jmap.api.change.TypeStateFactory
@@ -44,6 +44,7 @@ import org.apache.james.jmap.json.{PushSerializer, ResponseSerializer}
import org.apache.james.jmap.routes.PingPolicy.Interval
import org.apache.james.jmap.{Endpoint, JMAPRoute, JMAPRoutes, InjectionKeys => JMAPInjectionKeys}
import org.apache.james.mailbox.MailboxSession
+import org.apache.james.user.api.DelegationStore
import play.api.libs.json.Json
import reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST
import reactor.core.publisher.{Mono, Sinks}
@@ -157,7 +158,8 @@ class EventSourceRoutes@Inject() (@Named(InjectionKeys.RFC_8621) val authenticat
userProvisioner: UserProvisioning,
@Named(JMAPInjectionKeys.JMAP) eventBus: EventBus,
pushSerializer: PushSerializer,
- typeStateFactory: TypeStateFactory) extends JMAPRoutes {
+ typeStateFactory: TypeStateFactory,
+ delegationStore: DelegationStore) extends JMAPRoutes {
override def routes(): stream.Stream[JMAPRoute] = stream.Stream.of(
JMAPRoute.builder
@@ -188,10 +190,11 @@ class EventSourceRoutes@Inject() (@Named(InjectionKeys.RFC_8621) val authenticat
.asFlux()
.subscribe(ping => context.outbound.emitNext(ping, FAIL_FAST))
- SMono(
- eventBus.register(
- StateChangeListener(options.types, context.outbound),
- AccountIdRegistrationKey.of(session.getUser)))
+ SMono.just(session.getUser)
+ .concatWith(SFlux.fromPublisher(delegationStore.delegatedUsers(session.getUser)))
+ .map(username => AccountIdRegistrationKey.of(username).asInstanceOf[RegistrationKey])
+ .collectSeq()
+ .flatMap(keys => SMono(eventBus.register(StateChangeListener(options.types, context.outbound), keys.asJavaCollection)))
.doOnNext(newRegistration => context.withRegistration(newRegistration))
.subscribeOn(Schedulers.boundedElastic())
.subscribe()
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
index 8e232297cd..b4eaf4a27a 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/routes/WebSocketRoutes.scala
@@ -28,19 +28,20 @@ import io.netty.handler.codec.http.HttpMethod
import io.netty.handler.codec.http.websocketx.WebSocketFrame
import javax.inject.{Inject, Named}
import org.apache.james.core.Username
-import org.apache.james.events.{EventBus, Registration}
+import org.apache.james.events.{EventBus, Registration, RegistrationKey}
import org.apache.james.jmap.HttpConstants.JSON_CONTENT_TYPE
import org.apache.james.jmap.JMAPUrls.JMAP_WS
import org.apache.james.jmap.api.change.{EmailChangeRepository, MailboxChangeRepository, TypeStateFactory}
import org.apache.james.jmap.api.model.{AccountId => JavaAccountId}
-import org.apache.james.jmap.change.{AccountIdRegistrationKey, StateChangeListener, _}
-import org.apache.james.jmap.core.{OutboundMessage, ProblemDetails, RequestId, WebSocketError, WebSocketPushDisable, WebSocketPushEnable, WebSocketRequest, WebSocketResponse, _}
+import org.apache.james.jmap.change._
+import org.apache.james.jmap.core._
import org.apache.james.jmap.exceptions.UnauthorizedException
import org.apache.james.jmap.http.rfc8621.InjectionKeys
import org.apache.james.jmap.http.{Authenticator, UserProvisioning}
import org.apache.james.jmap.json.{PushSerializer, ResponseSerializer}
import org.apache.james.jmap.{Endpoint, JMAPRoute, JMAPRoutes, InjectionKeys => JMAPInjectionKeys}
import org.apache.james.mailbox.MailboxSession
+import org.apache.james.user.api.DelegationStore
import org.slf4j.{Logger, LoggerFactory}
import play.api.libs.json.Json
import reactor.core.publisher.Sinks.EmitFailureHandler.FAIL_FAST
@@ -50,6 +51,8 @@ import reactor.core.scheduler.Schedulers
import reactor.netty.http.server.{HttpServerRequest, HttpServerResponse}
import reactor.netty.http.websocket.{WebsocketInbound, WebsocketOutbound}
+import scala.jdk.CollectionConverters._
+
object WebSocketRoutes {
val LOGGER: Logger = LoggerFactory.getLogger(classOf[WebSocketRoutes])
}
@@ -75,7 +78,8 @@ class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticato
mailboxChangeRepository: MailboxChangeRepository,
emailChangeRepository: EmailChangeRepository,
pushSerializer: PushSerializer,
- typeStateFactory: TypeStateFactory) extends JMAPRoutes {
+ typeStateFactory: TypeStateFactory,
+ delegationStore: DelegationStore) extends JMAPRoutes {
override def routes(): stream.Stream[JMAPRoute] = stream.Stream.of(
JMAPRoute.builder
@@ -130,9 +134,13 @@ class WebSocketRoutes @Inject() (@Named(InjectionKeys.RFC_8621) val authenticato
.map[OutboundMessage](WebSocketResponse(request.id, _))
.onErrorResume(e => SMono.just(asError(request.id)(e)))
case pushEnable: WebSocketPushEnable =>
- SMono(eventBus.register(
+ SMono.just(clientContext.session.getUser)
+ .concatWith(SFlux.fromPublisher(delegationStore.delegatedUsers(clientContext.session.getUser)))
+ .map(username => AccountIdRegistrationKey.of(username).asInstanceOf[RegistrationKey])
+ .collectSeq()
+ .flatMap(keys => SMono(eventBus.register(
StateChangeListener(pushEnable.dataTypes.getOrElse(typeStateFactory.all.toSet), clientContext.outbound),
- AccountIdRegistrationKey.of(clientContext.session.getUser)))
+ keys.asJavaCollection)))
.doOnNext(newRegistration => clientContext.withRegistration(newRegistration))
.`then`(sendPushStateIfRequested(pushEnable, clientContext))
case WebSocketPushDisable => SMono.fromCallable(() => clientContext.clean())
diff --git a/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/pushsubscription/PushListenerTest.scala b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/pushsubscription/PushListenerTest.scala
index 5ee1bf0b04..bae016a02c 100644
--- a/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/pushsubscription/PushListenerTest.scala
+++ b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/pushsubscription/PushListenerTest.scala
@@ -38,11 +38,13 @@ import org.apache.james.jmap.change.{EmailDeliveryTypeName, EmailTypeName, Mailb
import org.apache.james.jmap.core.{AccountId, PushState, StateChange, UuidState}
import org.apache.james.jmap.json.PushSerializer
import org.apache.james.jmap.memory.pushsubscription.MemoryPushSubscriptionRepository
+import org.apache.james.user.api.DelegationStore
+import org.apache.james.user.memory.MemoryDelegationStore
import org.assertj.core.api.Assertions.assertThat
import org.assertj.core.api.SoftAssertions
import org.junit.jupiter.api.{BeforeEach, Nested, Test}
import org.mockito.ArgumentMatchers.any
-import org.mockito.Mockito.{mock, verify, verifyNoInteractions, when}
+import org.mockito.Mockito.{mock, times, verify, verifyNoInteractions, when}
import org.mockito.{ArgumentCaptor, ArgumentMatchers}
import reactor.core.scala.publisher.SMono
@@ -52,11 +54,13 @@ class PushListenerTest {
val bob: Username = Username.of("bob@localhost")
val alice: Username = Username.of("alice@localhost")
val bobAccountId: String = "405010d6c16c16dec36f3d7a7596c2d757ba7b57904adc4801a63e40914fd5c9"
+ val aliceAccountId: String = "93c56f4408cff66f0a929aea8e3940e753c3275e5622582ae3010e7277b7696c"
val url: PushSubscriptionServerURL = PushSubscriptionServerURL.from("http://localhost:9999/push").get
var testee: PushListener = _
var pushSubscriptionRepository: PushSubscriptionRepository = _
var webPushClient: WebPushClient = _
+ var delegationStore: DelegationStore = _
@BeforeEach
def setUp(): Unit = {
@@ -64,7 +68,8 @@ class PushListenerTest {
pushSubscriptionRepository = new MemoryPushSubscriptionRepository(Clock.systemUTC())
webPushClient = mock(classOf[WebPushClient])
- testee = new PushListener(pushSubscriptionRepository, webPushClient, pushSerializer)
+ delegationStore = new MemoryDelegationStore()
+ testee = new PushListener(pushSubscriptionRepository, webPushClient, pushSerializer, delegationStore)
when(webPushClient.push(any(), any())).thenReturn(SMono.empty[Unit])
}
@@ -126,6 +131,73 @@ class PushListenerTest {
})
}
+ @Test
+ def shouldNotPushAliceChangesToBobWhenBobIsNotDelegatedByAlice(): Unit = {
+ val bobSubscriptionId = SMono(pushSubscriptionRepository.save(bob, PushSubscriptionCreationRequest(
+ deviceClientId = DeviceClientId("junit1"),
+ url = url,
+ types = Seq(EmailTypeName, MailboxTypeName)))).block().id
+ SMono(pushSubscriptionRepository.validateVerificationCode(bob, bobSubscriptionId)).block()
+
+ val state1 = UuidState(UUID.randomUUID())
+ SMono(testee.reactiveEvent(StateChangeEvent(EventId.random(), alice, Map(EmailTypeName -> state1)))).block()
+
+ verify(webPushClient, times(0)).push(ArgumentMatchers.eq(url), any())
+ }
+
+ @Test
+ def shouldPushAliceChangesToAliceAndBobWhenBobIsDelegatedByAlice(): Unit = {
+ SMono.fromPublisher(delegationStore.addAuthorizedUser(alice, bob)).block()
+
+ val bobSubscriptionId = SMono(pushSubscriptionRepository.save(bob, PushSubscriptionCreationRequest(
+ deviceClientId = DeviceClientId("junit1"),
+ url = url,
+ types = Seq(EmailTypeName, MailboxTypeName)))).block().id
+ SMono(pushSubscriptionRepository.validateVerificationCode(bob, bobSubscriptionId)).block()
+ val aliceSubscriptionId = SMono(pushSubscriptionRepository.save(alice, PushSubscriptionCreationRequest(
+ deviceClientId = DeviceClientId("junit2"),
+ url = url,
+ types = Seq(EmailTypeName, MailboxTypeName)))).block().id
+ SMono(pushSubscriptionRepository.validateVerificationCode(alice, aliceSubscriptionId)).block()
+
+ val state1 = UuidState(UUID.randomUUID())
+ SMono(testee.reactiveEvent(StateChangeEvent(EventId.random(), alice, Map(EmailTypeName -> state1)))).block()
+
+ val argumentCaptor: ArgumentCaptor[PushRequest] = ArgumentCaptor.forClass(classOf[PushRequest])
+ verify(webPushClient, times(2)).push(ArgumentMatchers.eq(url), argumentCaptor.capture())
+ SoftAssertions.assertSoftly(softly => {
+ softly.assertThat(new String(argumentCaptor.getAllValues.get(0).payload, StandardCharsets.UTF_8))
+ .isEqualTo(s"""{"@type":"StateChange","changed":{"$aliceAccountId":{"Email":"${state1.value.toString}"}}}""")
+ softly.assertThat(new String(argumentCaptor.getAllValues.get(1).payload, StandardCharsets.UTF_8))
+ .isEqualTo(s"""{"@type":"StateChange","changed":{"$aliceAccountId":{"Email":"${state1.value.toString}"}}}""")
+ })
+ }
+
+ @Test
+ def bobShouldReceiveHisChangesAndAliceChangesWhenBobIsDelegatedByAlice(): Unit = {
+ SMono.fromPublisher(delegationStore.addAuthorizedUser(alice, bob)).block()
+
+ val bobSubscriptionId = SMono(pushSubscriptionRepository.save(bob, PushSubscriptionCreationRequest(
+ deviceClientId = DeviceClientId("junit1"),
+ url = url,
+ types = Seq(EmailTypeName, MailboxTypeName)))).block().id
+ SMono(pushSubscriptionRepository.validateVerificationCode(bob, bobSubscriptionId)).block()
+
+ val stateChangeBob = UuidState(UUID.randomUUID())
+ SMono(testee.reactiveEvent(StateChangeEvent(EventId.random(), bob, Map(EmailTypeName -> stateChangeBob)))).block()
+ val stateChangeAlice = UuidState(UUID.randomUUID())
+ SMono(testee.reactiveEvent(StateChangeEvent(EventId.random(), alice, Map(EmailTypeName -> stateChangeAlice)))).block()
+
+ val argumentCaptor: ArgumentCaptor[PushRequest] = ArgumentCaptor.forClass(classOf[PushRequest])
+ verify(webPushClient, times(2)).push(ArgumentMatchers.eq(url), argumentCaptor.capture())
+ SoftAssertions.assertSoftly(softly => {
+ softly.assertThat(new String(argumentCaptor.getAllValues.get(0).payload, StandardCharsets.UTF_8))
+ .isEqualTo(s"""{"@type":"StateChange","changed":{"$bobAccountId":{"Email":"${stateChangeBob.value.toString}"}}}""")
+ softly.assertThat(new String(argumentCaptor.getAllValues.get(1).payload, StandardCharsets.UTF_8))
+ .isEqualTo(s"""{"@type":"StateChange","changed":{"$aliceAccountId":{"Email":"${stateChangeAlice.value.toString}"}}}""")
+ })
+ }
+
@Test
def unwantedTypesShouldBeFilteredOut(): Unit = {
val id = SMono(pushSubscriptionRepository.save(bob, PushSubscriptionCreationRequest(
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org