You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/11/27 07:10:17 UTC

[james-project] 05/05: JAMES-3861 Propagate EmailDelivery push selection to JMAP level

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit f0da59b65a846d9c2aae8b6a5f65c92150ad93a8
Author: Quan Tran <hq...@linagora.com>
AuthorDate: Wed Nov 23 16:48:03 2022 +0700

    JAMES-3861 Propagate EmailDelivery push selection to JMAP level
---
 .../apache/james/jmap/api/change/EmailChange.java  |  24 ++-
 .../jmap/api/change/MailboxAndEmailChange.java     |   2 +
 .../rfc8621/contract/EventSourceContract.scala     | 206 +++++++++++++++++-
 .../jmap/rfc8621/contract/WebPushContract.scala    | 234 ++++++++++++++++++++-
 .../jmap/rfc8621/contract/WebSocketContract.scala  | 114 +++++++++-
 .../james/jmap/change/MailboxChangeListener.scala  |   2 +-
 6 files changed, 562 insertions(+), 20 deletions(-)

diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/EmailChange.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/EmailChange.java
index e22d5c2ca9..6c51f6e2fe 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/EmailChange.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/EmailChange.java
@@ -23,6 +23,7 @@ import java.time.ZonedDateTime;
 import java.util.Collection;
 import java.util.List;
 import java.util.Objects;
+import java.util.Optional;
 
 import org.apache.james.jmap.api.model.AccountId;
 import org.apache.james.mailbox.model.MessageId;
@@ -60,6 +61,7 @@ public class EmailChange implements JmapChange {
         private final ImmutableList.Builder<MessageId> created;
         private final ImmutableList.Builder<MessageId> updated;
         private final ImmutableList.Builder<MessageId> destroyed;
+        private Optional<Boolean> isDelivery;
 
         private Builder(AccountId accountId, State state, ZonedDateTime date, boolean isDelegated) {
             Preconditions.checkNotNull(accountId, "'accountId' should not be null");
@@ -73,6 +75,7 @@ public class EmailChange implements JmapChange {
             this.destroyed = ImmutableList.builder();
             this.updated = ImmutableList.builder();
             this.created = ImmutableList.builder();
+            this.isDelivery = Optional.empty();
         }
 
         public Builder updated(MessageId... messageId) {
@@ -105,8 +108,13 @@ public class EmailChange implements JmapChange {
             return this;
         }
 
+        public Builder isDelivery(boolean isDelivery) {
+            this.isDelivery = Optional.of(isDelivery);
+            return this;
+        }
+
         public EmailChange build() {
-            return new EmailChange(accountId, state, date, isDelegated, created.build(), updated.build(), destroyed.build());
+            return new EmailChange(accountId, state, date, isDelegated, created.build(), updated.build(), destroyed.build(), isDelivery.orElse(false));
         }
     }
 
@@ -121,8 +129,9 @@ public class EmailChange implements JmapChange {
     private final ImmutableList<MessageId> created;
     private final ImmutableList<MessageId> updated;
     private final ImmutableList<MessageId> destroyed;
+    private final boolean isDelivery;
 
-    private EmailChange(AccountId accountId, State state, ZonedDateTime date, boolean isDelegated, ImmutableList<MessageId> created, ImmutableList<MessageId> updated, ImmutableList<MessageId> destroyed) {
+    private EmailChange(AccountId accountId, State state, ZonedDateTime date, boolean isDelegated, ImmutableList<MessageId> created, ImmutableList<MessageId> updated, ImmutableList<MessageId> destroyed, boolean isDelivery) {
         this.accountId = accountId;
         this.state = state;
         this.date = date;
@@ -130,6 +139,7 @@ public class EmailChange implements JmapChange {
         this.created = created;
         this.updated = updated;
         this.destroyed = destroyed;
+        this.isDelivery = isDelivery;
     }
 
     public AccountId getAccountId() {
@@ -160,6 +170,10 @@ public class EmailChange implements JmapChange {
         return isDelegated;
     }
 
+    public boolean isDelivery() {
+        return isDelivery;
+    }
+
     @Override
     public final boolean equals(Object o) {
         if (o instanceof EmailChange) {
@@ -170,14 +184,15 @@ public class EmailChange implements JmapChange {
                 && Objects.equals(isDelegated, that.isDelegated)
                 && Objects.equals(created, that.created)
                 && Objects.equals(updated, that.updated)
-                && Objects.equals(destroyed, that.destroyed);
+                && Objects.equals(destroyed, that.destroyed)
+                && Objects.equals(isDelivery, that.isDelivery);
         }
         return false;
     }
 
     @Override
     public final int hashCode() {
-        return Objects.hash(accountId, state, date, isDelegated, created, updated, destroyed);
+        return Objects.hash(accountId, state, date, isDelegated, created, updated, destroyed, isDelivery);
     }
 
     @Override
@@ -190,6 +205,7 @@ public class EmailChange implements JmapChange {
             .add("created", created)
             .add("updated", updated)
             .add("destroyed", destroyed)
+            .add("isDelivery", isDelivery)
             .toString();
     }
 }
diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/MailboxAndEmailChange.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/MailboxAndEmailChange.java
index 23a2d94369..fe6761987a 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/MailboxAndEmailChange.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/MailboxAndEmailChange.java
@@ -61,6 +61,7 @@ public class MailboxAndEmailChange implements JmapChange {
                 .state(state)
                 .date(now)
                 .isDelegated(false)
+                .isDelivery(messageAdded.isDelivery())
                 .created(messageAdded.getMessageIds())
                 .build();
 
@@ -82,6 +83,7 @@ public class MailboxAndEmailChange implements JmapChange {
                             .state(state)
                             .date(now)
                             .isDelegated(true)
+                            .isDelivery(messageAdded.isDelivery())
                             .created(messageAdded.getMessageIds())
                             .build(),
                         MailboxChange.builder()
diff --git a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/EventSourceContract.scala b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/EventSourceContract.scala
index fea75f9e61..1d24f53c5c 100644
--- a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/EventSourceContract.scala
+++ b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/EventSourceContract.scala
@@ -20,15 +20,27 @@
 package org.apache.james.jmap.rfc8621.contract
 
 import java.nio.charset.StandardCharsets
+import java.util.concurrent.TimeUnit
 
+import io.netty.handler.codec.http.HttpHeaderNames.ACCEPT
 import io.netty.handler.codec.http.HttpResponseStatus
+import io.restassured.RestAssured.{`given`, requestSpecification}
+import org.apache.http.HttpStatus.SC_OK
 import org.apache.james.GuiceJamesServer
 import org.apache.james.jmap.draft.JmapGuiceProbe
-import org.apache.james.jmap.rfc8621.contract.Fixture.{ACCEPT_RFC8621_VERSION_HEADER, ANDRE, ANDRE_PASSWORD, BOB, BOB_PASSWORD, DOMAIN}
-import org.apache.james.mailbox.model.MailboxPath
+import org.apache.james.jmap.http.UserCredential
+import org.apache.james.jmap.rfc8621.contract.Fixture.{ACCEPT_RFC8621_VERSION_HEADER, ACCOUNT_ID, ANDRE, ANDRE_PASSWORD, BOB, BOB_PASSWORD, DOMAIN, authScheme, baseRequestSpecBuilder}
+import org.apache.james.mailbox.DefaultMailboxes
+import org.apache.james.mailbox.MessageManager.AppendCommand
+import org.apache.james.mailbox.model.{MailboxPath, MessageId}
+import org.apache.james.mime4j.dom.Message
 import org.apache.james.modules.MailboxProbeImpl
-import org.apache.james.utils.DataProbeImpl
+import org.apache.james.modules.protocols.SmtpGuiceProbe
+import org.apache.james.utils.{DataProbeImpl, SMTPMessageSender, SpoolerProbe}
 import org.assertj.core.api.Assertions.assertThat
+import org.awaitility.Awaitility
+import org.awaitility.Durations.ONE_HUNDRED_MILLISECONDS
+import org.awaitility.core.ConditionFactory
 import org.junit.jupiter.api.{BeforeEach, Test}
 import reactor.core.scheduler.Schedulers
 import reactor.netty.http.client.HttpClient
@@ -37,6 +49,12 @@ import scala.collection.mutable.ListBuffer
 import scala.jdk.CollectionConverters._
 
 trait EventSourceContract {
+  private lazy val awaitAtMostTenSeconds: ConditionFactory = Awaitility.`with`
+    .pollInterval(ONE_HUNDRED_MILLISECONDS)
+    .and.`with`.pollDelay(ONE_HUNDRED_MILLISECONDS)
+    .await
+    .atMost(10, TimeUnit.SECONDS)
+
   @BeforeEach
   def setUp(server: GuiceJamesServer): Unit = {
     server.getProbe(classOf[DataProbeImpl])
@@ -44,6 +62,11 @@ trait EventSourceContract {
       .addDomain(DOMAIN.asString())
       .addUser(ANDRE.asString(), ANDRE_PASSWORD)
       .addUser(BOB.asString(), BOB_PASSWORD)
+
+    requestSpecification = baseRequestSpecBuilder(server)
+      .setAuth(authScheme(UserCredential(BOB, BOB_PASSWORD)))
+      .addHeader(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+      .build()
   }
 
   @Test
@@ -259,6 +282,156 @@ trait EventSourceContract {
     assertThat(seq.head).endsWith("\n\n")
   }
 
+  @Test
+  def shouldPushEmailDeliveryChangeWhenUserReceivesEmail(server: GuiceJamesServer): Unit = {
+    val port = server.getProbe(classOf[JmapGuiceProbe]).getJmapPort.getValue
+    server.getProbe(classOf[MailboxProbeImpl]).createMailbox(MailboxPath.inbox(BOB))
+
+    val seq = new ListBuffer[String]()
+    HttpClient.create
+      .baseUrl(s"http://127.0.0.1:$port/eventSource?types=Mailbox,Email,VacationResponse,Thread,Identity,EmailSubmission,EmailDelivery&ping=0&closeAfter=no")
+      .headers(builder => {
+        builder.add("Authorization", "Basic Ym9iQGRvbWFpbi50bGQ6Ym9icGFzc3dvcmQ=")
+        builder.add("Accept", ACCEPT_RFC8621_VERSION_HEADER)
+      })
+      .get()
+      .responseContent()
+      .map(bb => {
+        val bytes = new Array[Byte](bb.readableBytes)
+        bb.readBytes(bytes)
+        new String(bytes, StandardCharsets.UTF_8)
+      })
+      .doOnNext(seq.addOne)
+      .subscribeOn(Schedulers.boundedElastic())
+      .subscribe()
+
+    // Bob receives a mail
+    Thread.sleep(500)
+    sendEmailToBob(server)
+
+    awaitAtMostTenSeconds.untilAsserted(() => {
+      assertThat(seq.asJava)
+        .hasSize(1)
+      assertThat(seq.head)
+        .contains("EmailDelivery")
+    })
+  }
+
+  @Test
+  def shouldNotPushEmailDeliveryChangeWhenUserCreatesDraftEmail(server: GuiceJamesServer): Unit = {
+    val port = server.getProbe(classOf[JmapGuiceProbe]).getJmapPort.getValue
+    val mailboxId = server.getProbe(classOf[MailboxProbeImpl]).createMailbox(MailboxPath.inbox(BOB))
+
+    val seq = new ListBuffer[String]()
+    HttpClient.create
+      .baseUrl(s"http://127.0.0.1:$port/eventSource?types=Mailbox,Email,VacationResponse,Thread,Identity,EmailSubmission,EmailDelivery&ping=0&closeAfter=no")
+      .headers(builder => {
+        builder.add("Authorization", "Basic Ym9iQGRvbWFpbi50bGQ6Ym9icGFzc3dvcmQ=")
+        builder.add("Accept", ACCEPT_RFC8621_VERSION_HEADER)
+      })
+      .get()
+      .responseContent()
+      .map(bb => {
+        val bytes = new Array[Byte](bb.readableBytes)
+        bb.readBytes(bytes)
+        new String(bytes, StandardCharsets.UTF_8)
+      })
+      .doOnNext(seq.addOne)
+      .subscribeOn(Schedulers.boundedElastic())
+      .subscribe()
+
+    // Bob creates a draft mail
+    Thread.sleep(500)
+    val request =
+      s"""{
+         |  "using": ["urn:ietf:params:jmap:core", "urn:ietf:params:jmap:mail"],
+         |  "methodCalls": [
+         |    ["Email/set", {
+         |      "accountId": "$ACCOUNT_ID",
+         |      "create": {
+         |        "aaaaaa":{
+         |          "mailboxIds": {
+         |             "${mailboxId.serialize}": true
+         |          },
+         |          "to": [{"email": "rcpt1@apache.org"}, {"email": "rcpt2@apache.org"}],
+         |          "from": [{"email": "${BOB.asString}"}]
+         |        }
+         |      }
+         |    }, "c1"]]
+         |}""".stripMargin
+
+    `given`
+      .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+      .body(request)
+    .when
+      .post
+    .`then`
+      .statusCode(SC_OK)
+
+    awaitAtMostTenSeconds.untilAsserted(() => {
+      assertThat(seq.asJava)
+        .hasSize(1)
+      assertThat(seq.head)
+        .doesNotContain("EmailDelivery")
+    })
+  }
+
+  @Test
+  def shouldNotPushEmailDeliveryChangeWhenUserSendsEmail(server: GuiceJamesServer): Unit = {
+    val port = server.getProbe(classOf[JmapGuiceProbe]).getJmapPort.getValue
+    server.getProbe(classOf[MailboxProbeImpl]).createMailbox(MailboxPath.inbox(BOB))
+
+    val messageId: MessageId = prepareDraftMessage(server)
+
+    val seq = new ListBuffer[String]()
+    HttpClient.create
+      .baseUrl(s"http://127.0.0.1:$port/eventSource?types=Mailbox,Email,VacationResponse,Thread,Identity,EmailSubmission,EmailDelivery&ping=0&closeAfter=no")
+      .headers(builder => {
+        builder.add("Authorization", "Basic Ym9iQGRvbWFpbi50bGQ6Ym9icGFzc3dvcmQ=")
+        builder.add("Accept", ACCEPT_RFC8621_VERSION_HEADER)
+      })
+      .get()
+      .responseContent()
+      .map(bb => {
+        val bytes = new Array[Byte](bb.readableBytes)
+        bb.readBytes(bytes)
+        new String(bytes, StandardCharsets.UTF_8)
+      })
+      .doOnNext(seq.addOne)
+      .subscribeOn(Schedulers.boundedElastic())
+      .subscribe()
+
+    // WHEN Bob sends an email to Andre
+    Thread.sleep(500)
+    val requestBob =
+      s"""{
+         |  "using": ["urn:ietf:params:jmap:core", "urn:ietf:params:jmap:mail", "urn:ietf:params:jmap:submission"],
+         |  "methodCalls": [
+         |     ["EmailSubmission/set", {
+         |       "accountId": "$ACCOUNT_ID",
+         |       "create": {
+         |         "k1490": {
+         |           "emailId": "${messageId.serialize}",
+         |           "envelope": {
+         |             "mailFrom": {"email": "${BOB.asString}"},
+         |             "rcptTo": [{"email": "${ANDRE.asString}"}]
+         |           }
+         |         }
+         |    }
+         |  }, "c1"]]
+         |}""".stripMargin
+
+    `given`
+      .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+      .body(requestBob)
+    .when
+      .post
+    .`then`
+      .statusCode(SC_OK)
+
+    awaitAtMostTenSeconds.untilAsserted(() => assertThat(seq.asJava).hasSize(0))
+  }
+
   @Test
   def pingShouldBeSupported(server: GuiceJamesServer): Unit = {
     val port = server.getProbe(classOf[JmapGuiceProbe]).getJmapPort.getValue
@@ -384,4 +557,31 @@ trait EventSourceContract {
     assertThat(seq.asJava)
       .hasSize(2)
   }
+
+  private def sendEmailToBob(server: GuiceJamesServer): Unit = {
+    val smtpMessageSender: SMTPMessageSender = new SMTPMessageSender(DOMAIN.asString())
+    smtpMessageSender.connect("127.0.0.1", server.getProbe(classOf[SmtpGuiceProbe]).getSmtpPort)
+      .authenticate(ANDRE.asString, ANDRE_PASSWORD)
+      .sendMessage(ANDRE.asString, BOB.asString())
+    smtpMessageSender.close()
+
+    awaitAtMostTenSeconds.until(() => server.getProbe(classOf[SpoolerProbe]).processingFinished())
+  }
+
+  private def prepareDraftMessage(server: GuiceJamesServer) = {
+    val message: Message = Message.Builder
+      .of
+      .setSubject("test")
+      .setSender(BOB.asString)
+      .setFrom(BOB.asString)
+      .setTo(ANDRE.asString)
+      .setBody("testmail", StandardCharsets.UTF_8)
+      .build
+    val bobDraftsPath = MailboxPath.forUser(BOB, DefaultMailboxes.DRAFTS)
+    server.getProbe(classOf[MailboxProbeImpl]).createMailbox(bobDraftsPath)
+    val messageId = server.getProbe(classOf[MailboxProbeImpl]).appendMessage(BOB.asString(), bobDraftsPath, AppendCommand.builder()
+      .build(message))
+      .getMessageId
+    messageId
+  }
 }
diff --git a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebPushContract.scala b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebPushContract.scala
index 33b9ef4c6d..599150875e 100644
--- a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebPushContract.scala
+++ b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebPushContract.scala
@@ -19,6 +19,14 @@
 
 package org.apache.james.jmap.rfc8621.contract
 
+import java.nio.charset.StandardCharsets
+import java.security.KeyPair
+import java.security.interfaces.{ECPrivateKey, ECPublicKey}
+import java.time.temporal.ChronoUnit
+import java.util.Base64
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.atomic.AtomicReference
+
 import com.google.crypto.tink.apps.webpush.WebPushHybridDecrypt
 import com.google.crypto.tink.subtle.EllipticCurves
 import com.google.crypto.tink.subtle.EllipticCurves.CurveType
@@ -33,7 +41,10 @@ import org.apache.james.jmap.core.ResponseObject.SESSION_STATE
 import org.apache.james.jmap.http.UserCredential
 import org.apache.james.jmap.rfc8621.contract.Fixture.{ACCEPT_RFC8621_VERSION_HEADER, ACCOUNT_ID, ANDRE, ANDRE_PASSWORD, BOB, BOB_PASSWORD, DOMAIN, authScheme, baseRequestSpecBuilder}
 import org.apache.james.jmap.rfc8621.contract.tags.CategoryTags
-import org.apache.james.mailbox.model.MailboxPath
+import org.apache.james.mailbox.DefaultMailboxes
+import org.apache.james.mailbox.MessageManager.AppendCommand
+import org.apache.james.mailbox.model.{MailboxConstants, MailboxPath, MessageId}
+import org.apache.james.mime4j.dom.Message
 import org.apache.james.modules.MailboxProbeImpl
 import org.apache.james.modules.protocols.SmtpGuiceProbe
 import org.apache.james.utils.{DataProbeImpl, SMTPMessageSender, SpoolerProbe, UpdatableTickingClock}
@@ -51,14 +62,6 @@ import org.mockserver.model.{HttpRequest, HttpResponse}
 import org.mockserver.verify.VerificationTimes
 import play.api.libs.json.{JsObject, JsString, Json}
 
-import java.nio.charset.StandardCharsets
-import java.security.KeyPair
-import java.security.interfaces.{ECPrivateKey, ECPublicKey}
-import java.time.temporal.ChronoUnit
-import java.util.Base64
-import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic.AtomicReference
-
 trait WebPushContract {
   private lazy val awaitAtMostTenSeconds: ConditionFactory = Awaitility.`with`
     .pollInterval(ONE_HUNDRED_MILLISECONDS)
@@ -239,6 +242,128 @@ trait WebPushContract {
     }
   }
 
+  @Test
+  def shouldPushEmailDeliveryChangeWhenUserReceivesEmail(server: GuiceJamesServer, pushServer: ClientAndServer): Unit = {
+    setupPushSubscriptionForBob(pushServer)
+
+    // WHEN bob receives a mail
+    sendEmailToBob(server)
+
+    // THEN bob has a EmailDelivery stateChange on the push gateway
+    awaitAtMostTenSeconds.untilAsserted { () =>
+      pushServer.verify(HttpRequest.request()
+        .withPath(PUSH_URL_PATH)
+        .withBody(json(
+          s"""{
+             |    "@type": "StateChange",
+             |    "changed": {
+             |        "$ACCOUNT_ID": {
+             |          "EmailDelivery": "$${json-unit.any-string}"
+             |        }
+             |    }
+             |}""".stripMargin)),
+        VerificationTimes.atLeast(1))
+    }
+  }
+
+  @Test
+  def shouldNotPushEmailDeliveryChangeWhenUserCreatesDraftEmail(server: GuiceJamesServer, pushServer: ClientAndServer): Unit = {
+    setupPushSubscriptionForBob(pushServer)
+
+    // WHEN bob create a draft mail
+    val mailboxId = server.getProbe(classOf[MailboxProbeImpl]).getMailboxId("#private", BOB.asString(), MailboxConstants.INBOX)
+    val request =
+      s"""{
+         |  "using": ["urn:ietf:params:jmap:core", "urn:ietf:params:jmap:mail"],
+         |  "methodCalls": [
+         |    ["Email/set", {
+         |      "accountId": "$ACCOUNT_ID",
+         |      "create": {
+         |        "aaaaaa":{
+         |          "mailboxIds": {
+         |             "${mailboxId.serialize}": true
+         |          },
+         |          "to": [{"email": "rcpt1@apache.org"}, {"email": "rcpt2@apache.org"}],
+         |          "from": [{"email": "${BOB.asString}"}]
+         |        }
+         |      }
+         |    }, "c1"]]
+         |}""".stripMargin
+
+    `given`
+      .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+      .body(request)
+    .when
+      .post
+    .`then`
+      .statusCode(SC_OK)
+
+    // THEN bob should not have a EmailDelivery stateChange on the push gateway
+    awaitAtMostTenSeconds.untilAsserted { () =>
+      pushServer.verify(HttpRequest.request()
+        .withPath(PUSH_URL_PATH)
+        .withBody(json(
+          s"""{
+             |    "@type": "StateChange",
+             |    "changed": {
+             |        "$ACCOUNT_ID": {
+             |          "EmailDelivery": "$${json-unit.any-string}"
+             |        }
+             |    }
+             |}""".stripMargin)),
+        VerificationTimes.never())
+    }
+  }
+
+  @Test
+  def shouldNotPushEmailDeliveryChangeWhenUserSendsEmail(server: GuiceJamesServer, pushServer: ClientAndServer): Unit = {
+    val messageId: MessageId = prepareDraftMessage(server)
+    setupPushSubscriptionForBob(pushServer)
+
+    // WHEN Bob sends an email to Andre
+    val requestBob =
+      s"""{
+         |  "using": ["urn:ietf:params:jmap:core", "urn:ietf:params:jmap:mail", "urn:ietf:params:jmap:submission"],
+         |  "methodCalls": [
+         |     ["EmailSubmission/set", {
+         |       "accountId": "$ACCOUNT_ID",
+         |       "create": {
+         |         "k1490": {
+         |           "emailId": "${messageId.serialize}",
+         |           "envelope": {
+         |             "mailFrom": {"email": "${BOB.asString}"},
+         |             "rcptTo": [{"email": "${ANDRE.asString}"}]
+         |           }
+         |         }
+         |    }
+         |  }, "c1"]]
+         |}""".stripMargin
+
+    `given`
+      .header(ACCEPT.toString, ACCEPT_RFC8621_VERSION_HEADER)
+      .body(requestBob)
+    .when
+      .post
+    .`then`
+      .statusCode(SC_OK)
+
+    // THEN bob should not have a EmailDelivery stateChange on the push gateway
+    awaitAtMostTenSeconds.untilAsserted { () =>
+      pushServer.verify(HttpRequest.request()
+        .withPath(PUSH_URL_PATH)
+        .withBody(json(
+          s"""{
+             |    "@type": "StateChange",
+             |    "changed": {
+             |        "$ACCOUNT_ID": {
+             |          "EmailDelivery": "$${json-unit.any-string}"
+             |        }
+             |    }
+             |}""".stripMargin)),
+        VerificationTimes.never())
+    }
+  }
+
   @Test
   def webPushShouldNotPushToPushServerWhenExpiredSubscription(server: GuiceJamesServer, pushServer: ClientAndServer, clock: UpdatableTickingClock): Unit = {
     // Setup mock-server for callback
@@ -534,4 +659,95 @@ trait WebPushContract {
            |    }
            |}""".stripMargin)
   }
+
+  private def prepareDraftMessage(server: GuiceJamesServer) = {
+    val message: Message = Message.Builder
+      .of
+      .setSubject("test")
+      .setSender(BOB.asString)
+      .setFrom(BOB.asString)
+      .setTo(ANDRE.asString)
+      .setBody("testmail", StandardCharsets.UTF_8)
+      .build
+    val bobDraftsPath = MailboxPath.forUser(BOB, DefaultMailboxes.DRAFTS)
+    server.getProbe(classOf[MailboxProbeImpl]).createMailbox(bobDraftsPath)
+    val messageId: MessageId = server.getProbe(classOf[MailboxProbeImpl]).appendMessage(BOB.asString(), bobDraftsPath, AppendCommand.builder()
+      .build(message))
+      .getMessageId
+    messageId
+  }
+
+  private def setupPushSubscriptionForBob(pushServer: ClientAndServer) = {
+    // Setup mock-server for callback
+    val bodyRequestOnPushServer: AtomicReference[String] = setupPushServerCallback(pushServer)
+
+    // WHEN bob creates a push subscription
+    val pushSubscriptionId: String = `given`
+      .body(
+        s"""{
+           |    "using": ["urn:ietf:params:jmap:core"],
+           |    "methodCalls": [
+           |      [
+           |        "PushSubscription/set",
+           |        {
+           |            "create": {
+           |                "4f29": {
+           |                  "deviceClientId": "a889-ffea-910",
+           |                  "url": "${getPushServerUrl(pushServer)}",
+           |                  "types": ["EmailDelivery"]
+           |                }
+           |              }
+           |        },
+           |        "c1"
+           |      ]
+           |    ]
+           |  }""".stripMargin)
+    .when
+      .post
+    .`then`
+      .statusCode(SC_OK)
+      .extract()
+      .jsonPath()
+      .get("methodResponses[0][1].created.4f29.id")
+
+    // THEN a validation code is sent
+    awaitAtMostTenSeconds.untilAsserted { () =>
+      pushServer.verify(HttpRequest.request()
+        .withPath(PUSH_URL_PATH)
+        .withBody(json(
+          s"""{
+             |    "@type": "PushVerification",
+             |    "pushSubscriptionId": "$pushSubscriptionId",
+             |    "verificationCode": "$${json-unit.any-string}"
+             |}""".stripMargin)),
+        VerificationTimes.atLeast(1))
+    }
+
+    // GIVEN bob retrieves the validation code from the mock server
+    val verificationCode: String = Json.parse(bodyRequestOnPushServer.get()).asInstanceOf[JsObject]
+      .value("verificationCode")
+      .asInstanceOf[JsString]
+      .value
+
+    // WHEN bob updates the validation code via JMAP
+    val updateVerificationCodeResponse: String = updateValidateVerificationCode(pushSubscriptionId, verificationCode)
+
+    // THEN it succeed
+    assertThatJson(updateVerificationCodeResponse)
+      .isEqualTo(
+        s"""{
+           |    "sessionState": "${SESSION_STATE.value}",
+           |    "methodResponses": [
+           |        [
+           |            "PushSubscription/set",
+           |            {
+           |                "updated": {
+           |                    "$pushSubscriptionId": {}
+           |                }
+           |            },
+           |            "c1"
+           |        ]
+           |    ]
+           |}""".stripMargin)
+  }
 }
diff --git a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketContract.scala b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketContract.scala
index 0cf1d3280c..6980fc1e71 100644
--- a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketContract.scala
+++ b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketContract.scala
@@ -20,6 +20,7 @@ package org.apache.james.jmap.rfc8621.contract
 
 import java.net.{ProtocolException, URI}
 import java.nio.charset.StandardCharsets
+import java.util.concurrent.TimeUnit
 
 import net.javacrumbs.jsonunit.assertj.JsonAssertions.assertThatJson
 import org.apache.james.GuiceJamesServer
@@ -32,9 +33,13 @@ import org.apache.james.mailbox.MessageManager.AppendCommand
 import org.apache.james.mailbox.model.MailboxACL.Right
 import org.apache.james.mailbox.model.{MailboxACL, MailboxPath}
 import org.apache.james.mime4j.dom.Message
+import org.apache.james.modules.protocols.SmtpGuiceProbe
 import org.apache.james.modules.{ACLProbeImpl, MailboxProbeImpl}
-import org.apache.james.utils.DataProbeImpl
+import org.apache.james.utils.{DataProbeImpl, SMTPMessageSender, SpoolerProbe}
 import org.assertj.core.api.Assertions.{assertThat, assertThatThrownBy}
+import org.awaitility.Awaitility
+import org.awaitility.Durations.ONE_HUNDRED_MILLISECONDS
+import org.awaitility.core.ConditionFactory
 import org.junit.jupiter.api.{BeforeEach, Test, Timeout}
 import play.api.libs.json.{JsString, Json}
 import reactor.core.scala.publisher.SMono
@@ -52,6 +57,11 @@ import sttp.ws.{WebSocket, WebSocketFrame}
 import scala.jdk.CollectionConverters._
 
 trait WebSocketContract {
+  private lazy val awaitAtMostTenSeconds: ConditionFactory = Awaitility.`with`
+    .pollInterval(ONE_HUNDRED_MILLISECONDS)
+    .and.`with`.pollDelay(ONE_HUNDRED_MILLISECONDS)
+    .await
+    .atMost(10, TimeUnit.SECONDS)
   private lazy val backend: SttpBackend[Identity, WebSockets] = OkHttpSyncBackend()
   private lazy implicit val monadError: MonadError[Identity] = IdMonad
 
@@ -645,7 +655,7 @@ trait WebSocketContract {
     val emailState: State = jmapGuiceProbe.getLatestEmailState(accountId)
 
     val globalState: String = PushState.fromOption(Some(UuidState.fromJava(mailboxState)), Some(UuidState.fromJava(emailState))).get.value
-    val stateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"${emailState.getValue}","EmailDelivery":"${emailState.getValue}","Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState"}""".stripMargin
+    val stateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"${emailState.getValue}","Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState"}""".stripMargin
 
     assertThat(response.toOption.get.asJava)
       .hasSize(2) // state change notification + API response
@@ -829,13 +839,101 @@ trait WebSocketContract {
     val mailboxState: State = jmapGuiceProbe.getLatestMailboxState(accountId)
 
     val globalState: String = PushState.fromOption(Some(UuidState.fromJava(mailboxState)), Some(UuidState.fromJava(emailState))).get.value
-    val stateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"${emailState.getValue}","EmailDelivery":"${emailState.getValue}","Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState"}""".stripMargin
+    val stateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"${emailState.getValue}","Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState"}""".stripMargin
 
     assertThat(response.toOption.get.asJava)
       .hasSize(2) // state change notification + API response
       .contains(stateChange)
   }
 
+  @Test
+  @Timeout(180)
+  def shouldPushEmailDeliveryChangeWhenUserReceivesEmail(server: GuiceJamesServer): Unit = {
+    server.getProbe(classOf[MailboxProbeImpl]).createMailbox(MailboxPath.inbox(BOB))
+
+    val response: Either[String, List[String]] =
+      authenticatedRequest(server)
+        .response(asWebSocket[Identity, List[String]] {
+          ws =>
+            ws.send(WebSocketFrame.text(
+              """{
+                |  "@type": "WebSocketPushEnable",
+                |  "dataTypes": null
+                |}""".stripMargin))
+
+            Thread.sleep(100)
+
+            // Andre send mail to Bob
+            sendEmailToBob(server)
+
+            List(
+              ws.receive()
+                .map { case t: Text =>
+                  t.payload
+                })
+        })
+        .send(backend)
+        .body
+
+    // Bob should receive EmailDelivery state change
+    assertThat(response.toOption.get.asJava)
+      .hasSize(1) // state change notification
+      .allMatch(s => s.contains("EmailDelivery"))
+  }
+
+  @Test
+  @Timeout(180)
+  def shouldNotPushEmailDeliveryChangeWhenCreateDraftMail(server: GuiceJamesServer): Unit = {
+    val mailboxId = server.getProbe(classOf[MailboxProbeImpl]).createMailbox(MailboxPath.inbox(BOB))
+
+    val response: Either[String, List[String]] =
+      authenticatedRequest(server)
+        .response(asWebSocket[Identity, List[String]] {
+          ws =>
+            ws.send(WebSocketFrame.text(
+              """{
+                |  "@type": "WebSocketPushEnable",
+                |  "dataTypes": null
+                |}""".stripMargin))
+
+            Thread.sleep(100)
+
+            ws.send(WebSocketFrame.text(
+              s"""{
+                 |  "@type": "Request",
+                 |  "id": "req-36",
+                 |  "using": ["urn:ietf:params:jmap:core", "urn:ietf:params:jmap:mail"],
+                 |  "methodCalls": [
+                 |    ["Email/set", {
+                 |      "accountId": "$ACCOUNT_ID",
+                 |      "create": {
+                 |        "aaaaaa":{
+                 |          "mailboxIds": {
+                 |             "${mailboxId.serialize}": true
+                 |          }
+                 |        }
+                 |      }
+                 |    }, "c1"]]
+                 |}""".stripMargin))
+
+            List(
+              ws.receive()
+                .map { case t: Text =>
+                  t.payload
+                },
+              ws.receive()
+                .map { case t: Text =>
+                  t.payload
+                })
+        })
+        .send(backend)
+        .body
+
+    assertThat(response.toOption.get.asJava)
+      .hasSize(2) // state change notification + API response
+      .noneMatch(s => s.contains("EmailDelivery"))
+  }
+
   @Test
   @Timeout(180)
   def pushEnableShouldUpdatePreviousSubscriptions(server: GuiceJamesServer): Unit = {
@@ -1124,4 +1222,14 @@ trait WebSocketContract {
     basicRequest.get(Uri.apply(new URI(s"ws://127.0.0.1:$port/jmap/ws")))
       .header("Accept", ACCEPT_RFC8621_VERSION_HEADER)
   }
+
+  private def sendEmailToBob(server: GuiceJamesServer): Unit = {
+    val smtpMessageSender: SMTPMessageSender = new SMTPMessageSender(DOMAIN.asString())
+    smtpMessageSender.connect("127.0.0.1", server.getProbe(classOf[SmtpGuiceProbe]).getSmtpPort)
+      .authenticate(ANDRE.asString, ANDRE_PASSWORD)
+      .sendMessage(ANDRE.asString, BOB.asString())
+    smtpMessageSender.close()
+
+    awaitAtMostTenSeconds.until(() => server.getProbe(classOf[SpoolerProbe]).processingFinished())
+  }
 }
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala
index 600437e738..7d3b1bb020 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala
@@ -139,7 +139,7 @@ case class MailboxChangeListener @Inject() (@Named(InjectionKeys.JMAP) eventBus:
   private def emailStateMap(emailChange: EmailChange): Map[TypeName, State] =
     (Map(EmailTypeName -> UuidState.fromJava(emailChange.getState)) ++
       Some(UuidState.fromJava(emailChange.getState))
-        .filter(_ => !emailChange.getCreated.isEmpty)
+        .filter(_ => emailChange.isDelivery && !emailChange.getCreated.isEmpty)
         .map(emailDeliveryState => Map(EmailDeliveryTypeName -> emailDeliveryState))
         .getOrElse(Map())).toMap
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org