You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/11/27 07:10:17 UTC
[james-project] 05/05: JAMES-3861 Propagate EmailDelivery push selection to JMAP level
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit f0da59b65a846d9c2aae8b6a5f65c92150ad93a8
Author: Quan Tran <hq...@linagora.com>
AuthorDate: Wed Nov 23 16:48:03 2022 +0700
JAMES-3861 Propagate EmailDelivery push selection to JMAP level
---
.../apache/james/jmap/api/change/EmailChange.java | 24 ++-
.../jmap/api/change/MailboxAndEmailChange.java | 2 +
.../rfc8621/contract/EventSourceContract.scala | 206 +++++++++++++++++-
.../jmap/rfc8621/contract/WebPushContract.scala | 234 ++++++++++++++++++++-
.../jmap/rfc8621/contract/WebSocketContract.scala | 114 +++++++++-
.../james/jmap/change/MailboxChangeListener.scala | 2 +-
6 files changed, 562 insertions(+), 20 deletions(-)
diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/EmailChange.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/EmailChange.java
index e22d5c2ca9..6c51f6e2fe 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/EmailChange.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/EmailChange.java
@@ -23,6 +23,7 @@ import java.time.ZonedDateTime;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
+import java.util.Optional;
import org.apache.james.jmap.api.model.AccountId;
import org.apache.james.mailbox.model.MessageId;
@@ -60,6 +61,7 @@ public class EmailChange implements JmapChange {
private final ImmutableList.Builder<MessageId> created;
private final ImmutableList.Builder<MessageId> updated;
private final ImmutableList.Builder<MessageId> destroyed;
+ private Optional<Boolean> isDelivery;
private Builder(AccountId accountId, State state, ZonedDateTime date, boolean isDelegated) {
Preconditions.checkNotNull(accountId, "'accountId' should not be null");
@@ -73,6 +75,7 @@ public class EmailChange implements JmapChange {
this.destroyed = ImmutableList.builder();
this.updated = ImmutableList.builder();
this.created = ImmutableList.builder();
+ this.isDelivery = Optional.empty();
}
public Builder updated(MessageId... messageId) {
@@ -105,8 +108,13 @@ public class EmailChange implements JmapChange {
return this;
}
+ public Builder isDelivery(boolean isDelivery) {
+ this.isDelivery = Optional.of(isDelivery);
+ return this;
+ }
+
public EmailChange build() {
- return new EmailChange(accountId, state, date, isDelegated, created.build(), updated.build(), destroyed.build());
+ return new EmailChange(accountId, state, date, isDelegated, created.build(), updated.build(), destroyed.build(), isDelivery.orElse(false));
}
}
@@ -121,8 +129,9 @@ public class EmailChange implements JmapChange {
private final ImmutableList<MessageId> created;
private final ImmutableList<MessageId> updated;
private final ImmutableList<MessageId> destroyed;
+ private final boolean isDelivery;
- private EmailChange(AccountId accountId, State state, ZonedDateTime date, boolean isDelegated, ImmutableList<MessageId> created, ImmutableList<MessageId> updated, ImmutableList<MessageId> destroyed) {
+ private EmailChange(AccountId accountId, State state, ZonedDateTime date, boolean isDelegated, ImmutableList<MessageId> created, ImmutableList<MessageId> updated, ImmutableList<MessageId> destroyed, boolean isDelivery) {
this.accountId = accountId;
this.state = state;
this.date = date;
@@ -130,6 +139,7 @@ public class EmailChange implements JmapChange {
this.created = created;
this.updated = updated;
this.destroyed = destroyed;
+ this.isDelivery = isDelivery;
}
public AccountId getAccountId() {
@@ -160,6 +170,10 @@ public class EmailChange implements JmapChange {
return isDelegated;
}
+ public boolean isDelivery() {
+ return isDelivery;
+ }
+
@Override
public final boolean equals(Object o) {
if (o instanceof EmailChange) {
@@ -170,14 +184,15 @@ public class EmailChange implements JmapChange {
&& Objects.equals(isDelegated, that.isDelegated)
&& Objects.equals(created, that.created)
&& Objects.equals(updated, that.updated)
- && Objects.equals(destroyed, that.destroyed);
+ && Objects.equals(destroyed, that.destroyed)
+ && Objects.equals(isDelivery, that.isDelivery);
}
return false;
}
@Override
public final int hashCode() {
- return Objects.hash(accountId, state, date, isDelegated, created, updated, destroyed);
+ return Objects.hash(accountId, state, date, isDelegated, created, updated, destroyed, isDelivery);
}
@Override
@@ -190,6 +205,7 @@ public class EmailChange implements JmapChange {
.add("created", created)
.add("updated", updated)
.add("destroyed", destroyed)
+ .add("isDelivery", isDelivery)
.toString();
}
}
diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/MailboxAndEmailChange.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/MailboxAndEmailChange.java
index 23a2d94369..fe6761987a 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/MailboxAndEmailChange.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/MailboxAndEmailChange.java
@@ -61,6 +61,7 @@ public class MailboxAndEmailChange implements JmapChange {
.state(state)
.date(now)
.isDelegated(false)
+ .isDelivery(messageAdded.isDelivery())
.created(messageAdded.getMessageIds())
.build();
@@ -82,6 +83,7 @@ public class MailboxAndEmailChange implements JmapChange {
.state(state)
.date(now)
.isDelegated(true)
+ .isDelivery(messageAdded.isDelivery())
.created(messageAdded.getMessageIds())
.build(),
MailboxChange.builder()
diff --git a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/EventSourceContract.scala b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/EventSourceContract.scala
index fea75f9e61..1d24f53c5c 100644
--- a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/EventSourceContract.scala
+++ b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/EventSourceContract.scala
@@ -20,15 +20,27 @@
package org.apache.james.jmap.rfc8621.contract
import java.nio.charset.StandardCharsets
+import java.util.concurrent.TimeUnit
+import io.netty.handler.codec.http.HttpHeaderNames.ACCEPT
import io.netty.handler.codec.http.HttpResponseStatus
+import io.restassured.RestAssured.{`given`, requestSpecification}
+import org.apache.http.HttpStatus.SC_OK
import org.apache.james.GuiceJamesServer
import org.apache.james.jmap.draft.JmapGuiceProbe
-import org.apache.james.jmap.rfc8621.contract.Fixture.{ACCEPT_RFC8621_VERSION_HEADER, ANDRE, ANDRE_PASSWORD, BOB, BOB_PASSWORD, DOMAIN}
-import org.apache.james.mailbox.model.MailboxPath
+import org.apache.james.jmap.http.UserCredential
+import org.apache.james.jmap.rfc8621.contract.Fixture.{ACCEPT_RFC8621_VERSION_HEADER, ACCOUNT_ID, ANDRE, ANDRE_PASSWORD, BOB, BOB_PASSWORD, DOMAIN, authScheme, baseRequestSpecBuilder}
+import org.apache.james.mailbox.DefaultMailboxes
+import org.apache.james.mailbox.MessageManager.AppendCommand
+import org.apache.james.mailbox.model.{MailboxPath, MessageId}
+import org.apache.james.mime4j.dom.Message
import org.apache.james.modules.MailboxProbeImpl
-import org.apache.james.utils.DataProbeImpl
+import org.apache.james.modules.protocols.SmtpGuiceProbe
+import org.apache.james.utils.{DataProbeImpl, SMTPMessageSender, SpoolerProbe}
import org.assertj.core.api.Assertions.assertThat
+import org.awaitility.Awaitility
+import org.awaitility.Durations.ONE_HUNDRED_MILLISECONDS
+import org.awaitility.core.ConditionFactory
import org.junit.jupiter.api.{BeforeEach, Test}
import reactor.core.scheduler.Schedulers
import reactor.netty.http.client.HttpClient
@@ -37,6 +49,12 @@ import scala.collection.mutable.ListBuffer
import scala.jdk.CollectionConverters._
trait EventSourceContract {
+ private lazy val awaitAtMostTenSeconds: ConditionFactory = Awaitility.`with`
+ .pollInterval(ONE_HUNDRED_MILLISECONDS)
+ .and.`with`.pollDelay(ONE_HUNDRED_MILLISECONDS)
+ .await
+ .atMost(10, TimeUnit.SECONDS)
+
@BeforeEach
def setUp(server: GuiceJamesServer): Unit = {
server.getProbe(classOf[DataProbeImpl])
@@ -44,6 +62,11 @@ trait EventSourceContract {
.addDomain(DOMAIN.asString())
.addUser(ANDRE.asString(), ANDRE_PASSWORD)
.addUser(BOB.asString(), BOB_PASSWORD)
+
+ requestSpecification = baseRequestSpecBuilder(server)
+ .setAuth(authScheme(UserCredential(BOB, BOB_PASSWORD)))
+ .addHeader(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+ .build()
}
@Test
@@ -259,6 +282,156 @@ trait EventSourceContract {
assertThat(seq.head).endsWith("\n\n")
}
+ @Test
+ def shouldPushEmailDeliveryChangeWhenUserReceivesEmail(server: GuiceJamesServer): Unit = {
+ val port = server.getProbe(classOf[JmapGuiceProbe]).getJmapPort.getValue
+ server.getProbe(classOf[MailboxProbeImpl]).createMailbox(MailboxPath.inbox(BOB))
+
+ val seq = new ListBuffer[String]()
+ HttpClient.create
+ .baseUrl(s"http://127.0.0.1:$port/eventSource?types=Mailbox,Email,VacationResponse,Thread,Identity,EmailSubmission,EmailDelivery&ping=0&closeAfter=no")
+ .headers(builder => {
+ builder.add("Authorization", "Basic Ym9iQGRvbWFpbi50bGQ6Ym9icGFzc3dvcmQ=")
+ builder.add("Accept", ACCEPT_RFC8621_VERSION_HEADER)
+ })
+ .get()
+ .responseContent()
+ .map(bb => {
+ val bytes = new Array[Byte](bb.readableBytes)
+ bb.readBytes(bytes)
+ new String(bytes, StandardCharsets.UTF_8)
+ })
+ .doOnNext(seq.addOne)
+ .subscribeOn(Schedulers.boundedElastic())
+ .subscribe()
+
+ // Bob receives a mail
+ Thread.sleep(500)
+ sendEmailToBob(server)
+
+ awaitAtMostTenSeconds.untilAsserted(() => {
+ assertThat(seq.asJava)
+ .hasSize(1)
+ assertThat(seq.head)
+ .contains("EmailDelivery")
+ })
+ }
+
+ @Test
+ def shouldNotPushEmailDeliveryChangeWhenUserCreatesDraftEmail(server: GuiceJamesServer): Unit = {
+ val port = server.getProbe(classOf[JmapGuiceProbe]).getJmapPort.getValue
+ val mailboxId = server.getProbe(classOf[MailboxProbeImpl]).createMailbox(MailboxPath.inbox(BOB))
+
+ val seq = new ListBuffer[String]()
+ HttpClient.create
+ .baseUrl(s"http://127.0.0.1:$port/eventSource?types=Mailbox,Email,VacationResponse,Thread,Identity,EmailSubmission,EmailDelivery&ping=0&closeAfter=no")
+ .headers(builder => {
+ builder.add("Authorization", "Basic Ym9iQGRvbWFpbi50bGQ6Ym9icGFzc3dvcmQ=")
+ builder.add("Accept", ACCEPT_RFC8621_VERSION_HEADER)
+ })
+ .get()
+ .responseContent()
+ .map(bb => {
+ val bytes = new Array[Byte](bb.readableBytes)
+ bb.readBytes(bytes)
+ new String(bytes, StandardCharsets.UTF_8)
+ })
+ .doOnNext(seq.addOne)
+ .subscribeOn(Schedulers.boundedElastic())
+ .subscribe()
+
+ // Bob creates a draft mail
+ Thread.sleep(500)
+ val request =
+ s"""{
+ | "using": ["urn:ietf:params:jmap:core", "urn:ietf:params:jmap:mail"],
+ | "methodCalls": [
+ | ["Email/set", {
+ | "accountId": "$ACCOUNT_ID",
+ | "create": {
+ | "aaaaaa":{
+ | "mailboxIds": {
+ | "${mailboxId.serialize}": true
+ | },
+ | "to": [{"email": "rcpt1@apache.org"}, {"email": "rcpt2@apache.org"}],
+ | "from": [{"email": "${BOB.asString}"}]
+ | }
+ | }
+ | }, "c1"]]
+ |}""".stripMargin
+
+ `given`
+ .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+ .body(request)
+ .when
+ .post
+ .`then`
+ .statusCode(SC_OK)
+
+ awaitAtMostTenSeconds.untilAsserted(() => {
+ assertThat(seq.asJava)
+ .hasSize(1)
+ assertThat(seq.head)
+ .doesNotContain("EmailDelivery")
+ })
+ }
+
+ @Test
+ def shouldNotPushEmailDeliveryChangeWhenUserSendsEmail(server: GuiceJamesServer): Unit = {
+ val port = server.getProbe(classOf[JmapGuiceProbe]).getJmapPort.getValue
+ server.getProbe(classOf[MailboxProbeImpl]).createMailbox(MailboxPath.inbox(BOB))
+
+ val messageId: MessageId = prepareDraftMessage(server)
+
+ val seq = new ListBuffer[String]()
+ HttpClient.create
+ .baseUrl(s"http://127.0.0.1:$port/eventSource?types=Mailbox,Email,VacationResponse,Thread,Identity,EmailSubmission,EmailDelivery&ping=0&closeAfter=no")
+ .headers(builder => {
+ builder.add("Authorization", "Basic Ym9iQGRvbWFpbi50bGQ6Ym9icGFzc3dvcmQ=")
+ builder.add("Accept", ACCEPT_RFC8621_VERSION_HEADER)
+ })
+ .get()
+ .responseContent()
+ .map(bb => {
+ val bytes = new Array[Byte](bb.readableBytes)
+ bb.readBytes(bytes)
+ new String(bytes, StandardCharsets.UTF_8)
+ })
+ .doOnNext(seq.addOne)
+ .subscribeOn(Schedulers.boundedElastic())
+ .subscribe()
+
+ // WHEN Bob sends an email to Andre
+ Thread.sleep(500)
+ val requestBob =
+ s"""{
+ | "using": ["urn:ietf:params:jmap:core", "urn:ietf:params:jmap:mail", "urn:ietf:params:jmap:submission"],
+ | "methodCalls": [
+ | ["EmailSubmission/set", {
+ | "accountId": "$ACCOUNT_ID",
+ | "create": {
+ | "k1490": {
+ | "emailId": "${messageId.serialize}",
+ | "envelope": {
+ | "mailFrom": {"email": "${BOB.asString}"},
+ | "rcptTo": [{"email": "${ANDRE.asString}"}]
+ | }
+ | }
+ | }
+ | }, "c1"]]
+ |}""".stripMargin
+
+ `given`
+ .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+ .body(requestBob)
+ .when
+ .post
+ .`then`
+ .statusCode(SC_OK)
+
+ awaitAtMostTenSeconds.untilAsserted(() => assertThat(seq.asJava).hasSize(0))
+ }
+
@Test
def pingShouldBeSupported(server: GuiceJamesServer): Unit = {
val port = server.getProbe(classOf[JmapGuiceProbe]).getJmapPort.getValue
@@ -384,4 +557,31 @@ trait EventSourceContract {
assertThat(seq.asJava)
.hasSize(2)
}
+
+ private def sendEmailToBob(server: GuiceJamesServer): Unit = {
+ val smtpMessageSender: SMTPMessageSender = new SMTPMessageSender(DOMAIN.asString())
+ smtpMessageSender.connect("127.0.0.1", server.getProbe(classOf[SmtpGuiceProbe]).getSmtpPort)
+ .authenticate(ANDRE.asString, ANDRE_PASSWORD)
+ .sendMessage(ANDRE.asString, BOB.asString())
+ smtpMessageSender.close()
+
+ awaitAtMostTenSeconds.until(() => server.getProbe(classOf[SpoolerProbe]).processingFinished())
+ }
+
+ private def prepareDraftMessage(server: GuiceJamesServer) = {
+ val message: Message = Message.Builder
+ .of
+ .setSubject("test")
+ .setSender(BOB.asString)
+ .setFrom(BOB.asString)
+ .setTo(ANDRE.asString)
+ .setBody("testmail", StandardCharsets.UTF_8)
+ .build
+ val bobDraftsPath = MailboxPath.forUser(BOB, DefaultMailboxes.DRAFTS)
+ server.getProbe(classOf[MailboxProbeImpl]).createMailbox(bobDraftsPath)
+ val messageId = server.getProbe(classOf[MailboxProbeImpl]).appendMessage(BOB.asString(), bobDraftsPath, AppendCommand.builder()
+ .build(message))
+ .getMessageId
+ messageId
+ }
}
diff --git a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebPushContract.scala b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebPushContract.scala
index 33b9ef4c6d..599150875e 100644
--- a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebPushContract.scala
+++ b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebPushContract.scala
@@ -19,6 +19,14 @@
package org.apache.james.jmap.rfc8621.contract
+import java.nio.charset.StandardCharsets
+import java.security.KeyPair
+import java.security.interfaces.{ECPrivateKey, ECPublicKey}
+import java.time.temporal.ChronoUnit
+import java.util.Base64
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicReference
+
import com.google.crypto.tink.apps.webpush.WebPushHybridDecrypt
import com.google.crypto.tink.subtle.EllipticCurves
import com.google.crypto.tink.subtle.EllipticCurves.CurveType
@@ -33,7 +41,10 @@ import org.apache.james.jmap.core.ResponseObject.SESSION_STATE
import org.apache.james.jmap.http.UserCredential
import org.apache.james.jmap.rfc8621.contract.Fixture.{ACCEPT_RFC8621_VERSION_HEADER, ACCOUNT_ID, ANDRE, ANDRE_PASSWORD, BOB, BOB_PASSWORD, DOMAIN, authScheme, baseRequestSpecBuilder}
import org.apache.james.jmap.rfc8621.contract.tags.CategoryTags
-import org.apache.james.mailbox.model.MailboxPath
+import org.apache.james.mailbox.DefaultMailboxes
+import org.apache.james.mailbox.MessageManager.AppendCommand
+import org.apache.james.mailbox.model.{MailboxConstants, MailboxPath, MessageId}
+import org.apache.james.mime4j.dom.Message
import org.apache.james.modules.MailboxProbeImpl
import org.apache.james.modules.protocols.SmtpGuiceProbe
import org.apache.james.utils.{DataProbeImpl, SMTPMessageSender, SpoolerProbe, UpdatableTickingClock}
@@ -51,14 +62,6 @@ import org.mockserver.model.{HttpRequest, HttpResponse}
import org.mockserver.verify.VerificationTimes
import play.api.libs.json.{JsObject, JsString, Json}
-import java.nio.charset.StandardCharsets
-import java.security.KeyPair
-import java.security.interfaces.{ECPrivateKey, ECPublicKey}
-import java.time.temporal.ChronoUnit
-import java.util.Base64
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic.AtomicReference
-
trait WebPushContract {
private lazy val awaitAtMostTenSeconds: ConditionFactory = Awaitility.`with`
.pollInterval(ONE_HUNDRED_MILLISECONDS)
@@ -239,6 +242,128 @@ trait WebPushContract {
}
}
+ @Test
+ def shouldPushEmailDeliveryChangeWhenUserReceivesEmail(server: GuiceJamesServer, pushServer: ClientAndServer): Unit = {
+ setupPushSubscriptionForBob(pushServer)
+
+ // WHEN bob receives a mail
+ sendEmailToBob(server)
+
+ // THEN bob has a EmailDelivery stateChange on the push gateway
+ awaitAtMostTenSeconds.untilAsserted { () =>
+ pushServer.verify(HttpRequest.request()
+ .withPath(PUSH_URL_PATH)
+ .withBody(json(
+ s"""{
+ | "@type": "StateChange",
+ | "changed": {
+ | "$ACCOUNT_ID": {
+ | "EmailDelivery": "$${json-unit.any-string}"
+ | }
+ | }
+ |}""".stripMargin)),
+ VerificationTimes.atLeast(1))
+ }
+ }
+
+ @Test
+ def shouldNotPushEmailDeliveryChangeWhenUserCreatesDraftEmail(server: GuiceJamesServer, pushServer: ClientAndServer): Unit = {
+ setupPushSubscriptionForBob(pushServer)
+
+ // WHEN bob create a draft mail
+ val mailboxId = server.getProbe(classOf[MailboxProbeImpl]).getMailboxId("#private", BOB.asString(), MailboxConstants.INBOX)
+ val request =
+ s"""{
+ | "using": ["urn:ietf:params:jmap:core", "urn:ietf:params:jmap:mail"],
+ | "methodCalls": [
+ | ["Email/set", {
+ | "accountId": "$ACCOUNT_ID",
+ | "create": {
+ | "aaaaaa":{
+ | "mailboxIds": {
+ | "${mailboxId.serialize}": true
+ | },
+ | "to": [{"email": "rcpt1@apache.org"}, {"email": "rcpt2@apache.org"}],
+ | "from": [{"email": "${BOB.asString}"}]
+ | }
+ | }
+ | }, "c1"]]
+ |}""".stripMargin
+
+ `given`
+ .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+ .body(request)
+ .when
+ .post
+ .`then`
+ .statusCode(SC_OK)
+
+ // THEN bob should not have a EmailDelivery stateChange on the push gateway
+ awaitAtMostTenSeconds.untilAsserted { () =>
+ pushServer.verify(HttpRequest.request()
+ .withPath(PUSH_URL_PATH)
+ .withBody(json(
+ s"""{
+ | "@type": "StateChange",
+ | "changed": {
+ | "$ACCOUNT_ID": {
+ | "EmailDelivery": "$${json-unit.any-string}"
+ | }
+ | }
+ |}""".stripMargin)),
+ VerificationTimes.never())
+ }
+ }
+
+ @Test
+ def shouldNotPushEmailDeliveryChangeWhenUserSendsEmail(server: GuiceJamesServer, pushServer: ClientAndServer): Unit = {
+ val messageId: MessageId = prepareDraftMessage(server)
+ setupPushSubscriptionForBob(pushServer)
+
+ // WHEN Bob sends an email to Andre
+ val requestBob =
+ s"""{
+ | "using": ["urn:ietf:params:jmap:core", "urn:ietf:params:jmap:mail", "urn:ietf:params:jmap:submission"],
+ | "methodCalls": [
+ | ["EmailSubmission/set", {
+ | "accountId": "$ACCOUNT_ID",
+ | "create": {
+ | "k1490": {
+ | "emailId": "${messageId.serialize}",
+ | "envelope": {
+ | "mailFrom": {"email": "${BOB.asString}"},
+ | "rcptTo": [{"email": "${ANDRE.asString}"}]
+ | }
+ | }
+ | }
+ | }, "c1"]]
+ |}""".stripMargin
+
+ `given`
+ .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+ .body(requestBob)
+ .when
+ .post
+ .`then`
+ .statusCode(SC_OK)
+
+ // THEN bob should not have a EmailDelivery stateChange on the push gateway
+ awaitAtMostTenSeconds.untilAsserted { () =>
+ pushServer.verify(HttpRequest.request()
+ .withPath(PUSH_URL_PATH)
+ .withBody(json(
+ s"""{
+ | "@type": "StateChange",
+ | "changed": {
+ | "$ACCOUNT_ID": {
+ | "EmailDelivery": "$${json-unit.any-string}"
+ | }
+ | }
+ |}""".stripMargin)),
+ VerificationTimes.never())
+ }
+ }
+
@Test
def webPushShouldNotPushToPushServerWhenExpiredSubscription(server: GuiceJamesServer, pushServer: ClientAndServer, clock: UpdatableTickingClock): Unit = {
// Setup mock-server for callback
@@ -534,4 +659,95 @@ trait WebPushContract {
| }
|}""".stripMargin)
}
+
+ private def prepareDraftMessage(server: GuiceJamesServer) = {
+ val message: Message = Message.Builder
+ .of
+ .setSubject("test")
+ .setSender(BOB.asString)
+ .setFrom(BOB.asString)
+ .setTo(ANDRE.asString)
+ .setBody("testmail", StandardCharsets.UTF_8)
+ .build
+ val bobDraftsPath = MailboxPath.forUser(BOB, DefaultMailboxes.DRAFTS)
+ server.getProbe(classOf[MailboxProbeImpl]).createMailbox(bobDraftsPath)
+ val messageId: MessageId = server.getProbe(classOf[MailboxProbeImpl]).appendMessage(BOB.asString(), bobDraftsPath, AppendCommand.builder()
+ .build(message))
+ .getMessageId
+ messageId
+ }
+
+ private def setupPushSubscriptionForBob(pushServer: ClientAndServer) = {
+ // Setup mock-server for callback
+ val bodyRequestOnPushServer: AtomicReference[String] = setupPushServerCallback(pushServer)
+
+ // WHEN bob creates a push subscription
+ val pushSubscriptionId: String = `given`
+ .body(
+ s"""{
+ | "using": ["urn:ietf:params:jmap:core"],
+ | "methodCalls": [
+ | [
+ | "PushSubscription/set",
+ | {
+ | "create": {
+ | "4f29": {
+ | "deviceClientId": "a889-ffea-910",
+ | "url": "${getPushServerUrl(pushServer)}",
+ | "types": ["EmailDelivery"]
+ | }
+ | }
+ | },
+ | "c1"
+ | ]
+ | ]
+ | }""".stripMargin)
+ .when
+ .post
+ .`then`
+ .statusCode(SC_OK)
+ .extract()
+ .jsonPath()
+ .get("methodResponses[0][1].created.4f29.id")
+
+ // THEN a validation code is sent
+ awaitAtMostTenSeconds.untilAsserted { () =>
+ pushServer.verify(HttpRequest.request()
+ .withPath(PUSH_URL_PATH)
+ .withBody(json(
+ s"""{
+ | "@type": "PushVerification",
+ | "pushSubscriptionId": "$pushSubscriptionId",
+ | "verificationCode": "$${json-unit.any-string}"
+ |}""".stripMargin)),
+ VerificationTimes.atLeast(1))
+ }
+
+ // GIVEN bob retrieves the validation code from the mock server
+ val verificationCode: String = Json.parse(bodyRequestOnPushServer.get()).asInstanceOf[JsObject]
+ .value("verificationCode")
+ .asInstanceOf[JsString]
+ .value
+
+ // WHEN bob updates the validation code via JMAP
+ val updateVerificationCodeResponse: String = updateValidateVerificationCode(pushSubscriptionId, verificationCode)
+
+ // THEN it succeed
+ assertThatJson(updateVerificationCodeResponse)
+ .isEqualTo(
+ s"""{
+ | "sessionState": "${SESSION_STATE.value}",
+ | "methodResponses": [
+ | [
+ | "PushSubscription/set",
+ | {
+ | "updated": {
+ | "$pushSubscriptionId": {}
+ | }
+ | },
+ | "c1"
+ | ]
+ | ]
+ |}""".stripMargin)
+ }
}
diff --git a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketContract.scala b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketContract.scala
index 0cf1d3280c..6980fc1e71 100644
--- a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketContract.scala
+++ b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketContract.scala
@@ -20,6 +20,7 @@ package org.apache.james.jmap.rfc8621.contract
import java.net.{ProtocolException, URI}
import java.nio.charset.StandardCharsets
+import java.util.concurrent.TimeUnit
import net.javacrumbs.jsonunit.assertj.JsonAssertions.assertThatJson
import org.apache.james.GuiceJamesServer
@@ -32,9 +33,13 @@ import org.apache.james.mailbox.MessageManager.AppendCommand
import org.apache.james.mailbox.model.MailboxACL.Right
import org.apache.james.mailbox.model.{MailboxACL, MailboxPath}
import org.apache.james.mime4j.dom.Message
+import org.apache.james.modules.protocols.SmtpGuiceProbe
import org.apache.james.modules.{ACLProbeImpl, MailboxProbeImpl}
-import org.apache.james.utils.DataProbeImpl
+import org.apache.james.utils.{DataProbeImpl, SMTPMessageSender, SpoolerProbe}
import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
+import org.awaitility.Awaitility
+import org.awaitility.Durations.ONE_HUNDRED_MILLISECONDS
+import org.awaitility.core.ConditionFactory
import org.junit.jupiter.api.{BeforeEach, Test, Timeout}
import play.api.libs.json.{JsString, Json}
import reactor.core.scala.publisher.SMono
@@ -52,6 +57,11 @@ import sttp.ws.{WebSocket, WebSocketFrame}
import scala.jdk.CollectionConverters._
trait WebSocketContract {
+ private lazy val awaitAtMostTenSeconds: ConditionFactory = Awaitility.`with`
+ .pollInterval(ONE_HUNDRED_MILLISECONDS)
+ .and.`with`.pollDelay(ONE_HUNDRED_MILLISECONDS)
+ .await
+ .atMost(10, TimeUnit.SECONDS)
private lazy val backend: SttpBackend[Identity, WebSockets] = OkHttpSyncBackend()
private lazy implicit val monadError: MonadError[Identity] = IdMonad
@@ -645,7 +655,7 @@ trait WebSocketContract {
val emailState: State = jmapGuiceProbe.getLatestEmailState(accountId)
val globalState: String = PushState.fromOption(Some(UuidState.fromJava(mailboxState)), Some(UuidState.fromJava(emailState))).get.value
- val stateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"${emailState.getValue}","EmailDelivery":"${emailState.getValue}","Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState"}""".stripMargin
+ val stateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"${emailState.getValue}","Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState"}""".stripMargin
assertThat(response.toOption.get.asJava)
.hasSize(2) // state change notification + API response
@@ -829,13 +839,101 @@ trait WebSocketContract {
val mailboxState: State = jmapGuiceProbe.getLatestMailboxState(accountId)
val globalState: String = PushState.fromOption(Some(UuidState.fromJava(mailboxState)), Some(UuidState.fromJava(emailState))).get.value
- val stateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"${emailState.getValue}","EmailDelivery":"${emailState.getValue}","Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState"}""".stripMargin
+ val stateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"${emailState.getValue}","Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState"}""".stripMargin
assertThat(response.toOption.get.asJava)
.hasSize(2) // state change notification + API response
.contains(stateChange)
}
+ @Test
+ @Timeout(180)
+ def shouldPushEmailDeliveryChangeWhenUserReceivesEmail(server: GuiceJamesServer): Unit = {
+ server.getProbe(classOf[MailboxProbeImpl]).createMailbox(MailboxPath.inbox(BOB))
+
+ val response: Either[String, List[String]] =
+ authenticatedRequest(server)
+ .response(asWebSocket[Identity, List[String]] {
+ ws =>
+ ws.send(WebSocketFrame.text(
+ """{
+ | "@type": "WebSocketPushEnable",
+ | "dataTypes": null
+ |}""".stripMargin))
+
+ Thread.sleep(100)
+
+ // Andre send mail to Bob
+ sendEmailToBob(server)
+
+ List(
+ ws.receive()
+ .map { case t: Text =>
+ t.payload
+ })
+ })
+ .send(backend)
+ .body
+
+ // Bob should receive EmailDelivery state change
+ assertThat(response.toOption.get.asJava)
+ .hasSize(1) // state change notification
+ .allMatch(s => s.contains("EmailDelivery"))
+ }
+
+ @Test
+ @Timeout(180)
+ def shouldNotPushEmailDeliveryChangeWhenCreateDraftMail(server: GuiceJamesServer): Unit = {
+ val mailboxId = server.getProbe(classOf[MailboxProbeImpl]).createMailbox(MailboxPath.inbox(BOB))
+
+ val response: Either[String, List[String]] =
+ authenticatedRequest(server)
+ .response(asWebSocket[Identity, List[String]] {
+ ws =>
+ ws.send(WebSocketFrame.text(
+ """{
+ | "@type": "WebSocketPushEnable",
+ | "dataTypes": null
+ |}""".stripMargin))
+
+ Thread.sleep(100)
+
+ ws.send(WebSocketFrame.text(
+ s"""{
+ | "@type": "Request",
+ | "id": "req-36",
+ | "using": ["urn:ietf:params:jmap:core", "urn:ietf:params:jmap:mail"],
+ | "methodCalls": [
+ | ["Email/set", {
+ | "accountId": "$ACCOUNT_ID",
+ | "create": {
+ | "aaaaaa":{
+ | "mailboxIds": {
+ | "${mailboxId.serialize}": true
+ | }
+ | }
+ | }
+ | }, "c1"]]
+ |}""".stripMargin))
+
+ List(
+ ws.receive()
+ .map { case t: Text =>
+ t.payload
+ },
+ ws.receive()
+ .map { case t: Text =>
+ t.payload
+ })
+ })
+ .send(backend)
+ .body
+
+ assertThat(response.toOption.get.asJava)
+ .hasSize(2) // state change notification + API response
+ .noneMatch(s => s.contains("EmailDelivery"))
+ }
+
@Test
@Timeout(180)
def pushEnableShouldUpdatePreviousSubscriptions(server: GuiceJamesServer): Unit = {
@@ -1124,4 +1222,14 @@ trait WebSocketContract {
basicRequest.get(Uri.apply(new URI(s"ws://127.0.0.1:$port/jmap/ws")))
.header("Accept", ACCEPT_RFC8621_VERSION_HEADER)
}
+
+ private def sendEmailToBob(server: GuiceJamesServer): Unit = {
+ val smtpMessageSender: SMTPMessageSender = new SMTPMessageSender(DOMAIN.asString())
+ smtpMessageSender.connect("127.0.0.1", server.getProbe(classOf[SmtpGuiceProbe]).getSmtpPort)
+ .authenticate(ANDRE.asString, ANDRE_PASSWORD)
+ .sendMessage(ANDRE.asString, BOB.asString())
+ smtpMessageSender.close()
+
+ awaitAtMostTenSeconds.until(() => server.getProbe(classOf[SpoolerProbe]).processingFinished())
+ }
}
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala
index 600437e738..7d3b1bb020 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala
@@ -139,7 +139,7 @@ case class MailboxChangeListener @Inject() (@Named(InjectionKeys.JMAP) eventBus:
private def emailStateMap(emailChange: EmailChange): Map[TypeName, State] =
(Map(EmailTypeName -> UuidState.fromJava(emailChange.getState)) ++
Some(UuidState.fromJava(emailChange.getState))
- .filter(_ => !emailChange.getCreated.isEmpty)
+ .filter(_ => emailChange.isDelivery && !emailChange.getCreated.isEmpty)
.map(emailDeliveryState => Map(EmailDeliveryTypeName -> emailDeliveryState))
.getOrElse(Map())).toMap
}
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org