You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2021/06/17 03:11:11 UTC

[james-project] branch master updated: JAMES-3491 Do not send two JMAP events upon new messages

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


The following commit(s) were added to refs/heads/master by this push:
     new 8379d2b  JAMES-3491 Do not send two JMAP events upon new messages
8379d2b is described below

commit 8379d2bd5f63ed947fe194c99260fb612cc18f2f
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sat Jun 12 12:45:10 2021 +0700

    JAMES-3491 Do not send two JMAP events upon new messages
    
    Before we were firing two events on the JMAP event bus:
    
     - One for emails
     - One for mailbox
    
    The following work aggregates both into a single notification
    and thus reduces of around 30% RabbitMQ publish rate.
---
 .../apache/james/jmap/api/change/EmailChange.java  |  95 ---------
 .../jmap/api/change/MailboxAndEmailChange.java     | 220 +++++++++++++++++++++
 .../james/jmap/api/change/MailboxChange.java       |  79 --------
 .../jmap/rfc8621/contract/WebSocketContract.scala  |  82 +++-----
 .../james/jmap/change/MailboxChangeListener.scala  |  41 ++--
 .../jmap/change/MailboxChangeListenerTest.scala    |   6 +-
 6 files changed, 273 insertions(+), 250 deletions(-)

diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/EmailChange.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/EmailChange.java
index 00e2733..e22d5c2 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/EmailChange.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/EmailChange.java
@@ -23,28 +23,13 @@ import java.time.ZonedDateTime;
 import java.util.Collection;
 import java.util.List;
 import java.util.Objects;
-import java.util.stream.Stream;
 
-import javax.inject.Inject;
-
-import org.apache.james.core.Username;
 import org.apache.james.jmap.api.model.AccountId;
-import org.apache.james.mailbox.MessageIdManager;
-import org.apache.james.mailbox.SessionProvider;
-import org.apache.james.mailbox.events.MailboxEvents.Added;
-import org.apache.james.mailbox.events.MailboxEvents.Expunged;
-import org.apache.james.mailbox.events.MailboxEvents.FlagsUpdated;
 import org.apache.james.mailbox.model.MessageId;
 
-import com.github.steveash.guavate.Guavate;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
-
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
 
 public class EmailChange implements JmapChange {
     public static class Builder {
@@ -129,86 +114,6 @@ public class EmailChange implements JmapChange {
         return accountId -> state -> date -> isDelegated -> new Builder(accountId, state, date, isDelegated);
     }
 
-    public static class Factory {
-        private final State.Factory stateFactory;
-        private final MessageIdManager messageIdManager;
-        private final SessionProvider sessionProvider;
-
-        @Inject
-        public Factory(State.Factory stateFactory, MessageIdManager messageIdManager, SessionProvider sessionProvider) {
-            this.stateFactory = stateFactory;
-            this.messageIdManager = messageIdManager;
-            this.sessionProvider = sessionProvider;
-        }
-
-        public List<JmapChange> fromAdded(Added messageAdded, ZonedDateTime now, List<AccountId> sharees) {
-            EmailChange ownerChange = EmailChange.builder()
-                .accountId(AccountId.fromUsername(messageAdded.getUsername()))
-                .state(stateFactory.generate())
-                .date(now)
-                .isDelegated(false)
-                .created(messageAdded.getMessageIds())
-                .build();
-
-            Stream<EmailChange> shareeChanges = sharees.stream()
-                .map(shareeId -> EmailChange.builder()
-                    .accountId(shareeId)
-                    .state(stateFactory.generate())
-                    .date(now)
-                    .isDelegated(true)
-                    .created(messageAdded.getMessageIds())
-                    .build());
-
-            return Stream.concat(Stream.of(ownerChange), shareeChanges)
-                .collect(Guavate.toImmutableList());
-        }
-
-        public List<JmapChange> fromFlagsUpdated(FlagsUpdated messageFlagUpdated, ZonedDateTime now, List<AccountId> sharees) {
-            EmailChange ownerChange = EmailChange.builder()
-                .accountId(AccountId.fromUsername(messageFlagUpdated.getUsername()))
-                .state(stateFactory.generate())
-                .date(now)
-                .isDelegated(false)
-                .updated(messageFlagUpdated.getMessageIds())
-                .build();
-
-            Stream<EmailChange> shareeChanges = sharees.stream()
-                .map(shareeId -> EmailChange.builder()
-                    .accountId(shareeId)
-                    .state(stateFactory.generate())
-                    .date(now)
-                    .isDelegated(true)
-                    .updated(messageFlagUpdated.getMessageIds())
-                    .build());
-
-            return Stream.concat(Stream.of(ownerChange), shareeChanges)
-                .collect(Guavate.toImmutableList());
-        }
-
-        public Flux<JmapChange> fromExpunged(Expunged expunged, ZonedDateTime now, List<Username> sharees) {
-
-            Mono<EmailChange> ownerChange = fromExpunged(expunged, now, expunged.getUsername());
-
-            Flux<EmailChange> shareeChanges = Flux.fromIterable(sharees)
-                .flatMap(shareeId -> fromExpunged(expunged, now, shareeId));
-
-            return Flux.concat(ownerChange, shareeChanges);
-        }
-
-        private Mono<EmailChange> fromExpunged(Expunged expunged, ZonedDateTime now, Username username) {
-            return Mono.from(messageIdManager.accessibleMessagesReactive(expunged.getMessageIds(),
-                sessionProvider.createSystemSession(username)))
-                .map(accessibleMessageIds -> EmailChange.builder()
-                    .accountId(AccountId.fromUsername(username))
-                    .state(stateFactory.generate())
-                    .date(now)
-                    .isDelegated(false)
-                    .updated(Sets.intersection(ImmutableSet.copyOf(expunged.getMessageIds()), accessibleMessageIds))
-                    .destroyed(Sets.difference(ImmutableSet.copyOf(expunged.getMessageIds()), accessibleMessageIds))
-                    .build());
-        }
-    }
-
     private final AccountId accountId;
     private final State state;
     private final ZonedDateTime date;
diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/MailboxAndEmailChange.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/MailboxAndEmailChange.java
new file mode 100644
index 0000000..5256641
--- /dev/null
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/MailboxAndEmailChange.java
@@ -0,0 +1,220 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.jmap.api.change;
+
+import java.time.ZonedDateTime;
+import java.util.List;
+import java.util.stream.Stream;
+
+import javax.inject.Inject;
+import javax.mail.Flags;
+
+import org.apache.james.core.Username;
+import org.apache.james.jmap.api.model.AccountId;
+import org.apache.james.mailbox.MessageIdManager;
+import org.apache.james.mailbox.SessionProvider;
+import org.apache.james.mailbox.events.MailboxEvents;
+
+import com.github.steveash.guavate.Guavate;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class MailboxAndEmailChange implements JmapChange {
+    public static class Factory {
+        private final State.Factory stateFactory;
+        private final MessageIdManager messageIdManager;
+        private final SessionProvider sessionProvider;
+
+        @Inject
+        public Factory(State.Factory stateFactory, MessageIdManager messageIdManager, SessionProvider sessionProvider) {
+            this.stateFactory = stateFactory;
+            this.messageIdManager = messageIdManager;
+            this.sessionProvider = sessionProvider;
+        }
+
+        public List<JmapChange> fromAdded(MailboxEvents.Added messageAdded, ZonedDateTime now, List<AccountId> sharees) {
+            AccountId accountId = AccountId.fromUsername(messageAdded.getUsername());
+            State state = stateFactory.generate();
+            EmailChange ownerEmailChange = EmailChange.builder()
+                .accountId(accountId)
+                .state(state)
+                .date(now)
+                .isDelegated(false)
+                .created(messageAdded.getMessageIds())
+                .build();
+
+            MailboxChange ownerMailboxChange = MailboxChange.builder()
+                .accountId(AccountId.fromUsername(messageAdded.getUsername()))
+                .state(state)
+                .date(now)
+                .isCountChange(true)
+                .delegated(false)
+                .updated(ImmutableList.of(messageAdded.getMailboxId()))
+                .build();
+
+            MailboxAndEmailChange ownerChange = new MailboxAndEmailChange(accountId, ownerEmailChange, ownerMailboxChange);
+
+            Stream<MailboxAndEmailChange> shareeChanges = sharees.stream()
+                .map(shareeId -> new MailboxAndEmailChange(shareeId,
+                        EmailChange.builder()
+                            .accountId(shareeId)
+                            .state(state)
+                            .date(now)
+                            .isDelegated(true)
+                            .created(messageAdded.getMessageIds())
+                            .build(),
+                        MailboxChange.builder()
+                            .accountId(shareeId)
+                            .state(state)
+                            .date(now)
+                            .isCountChange(true)
+                            .delegated(true)
+                            .updated(ImmutableList.of(messageAdded.getMailboxId()))
+                            .build()));
+
+            return Stream.concat(Stream.of(ownerChange), shareeChanges)
+                .collect(Guavate.toImmutableList());
+        }
+
+        public List<JmapChange> fromFlagsUpdated(MailboxEvents.FlagsUpdated messageFlagUpdated, ZonedDateTime now, List<AccountId> sharees) {
+            boolean isSeenChanged = messageFlagUpdated.getUpdatedFlags()
+                .stream()
+                .anyMatch(flags -> flags.isChanged(Flags.Flag.SEEN));
+            AccountId accountId = AccountId.fromUsername(messageFlagUpdated.getUsername());
+            EmailChange ownerEmailChange = EmailChange.builder()
+                .accountId(accountId)
+                .state(stateFactory.generate())
+                .date(now)
+                .isDelegated(false)
+                .updated(messageFlagUpdated.getMessageIds())
+                .build();
+
+            if (isSeenChanged) {
+                MailboxChange ownerMailboxChange = MailboxChange.builder()
+                    .accountId(AccountId.fromUsername(messageFlagUpdated.getUsername()))
+                    .state(stateFactory.generate())
+                    .date(now)
+                    .isCountChange(true)
+                    .updated(ImmutableList.of(messageFlagUpdated.getMailboxId()))
+                    .build();
+                MailboxAndEmailChange ownerChange = new MailboxAndEmailChange(accountId, ownerEmailChange, ownerMailboxChange);
+
+                Stream<MailboxAndEmailChange> shareeChanges = sharees.stream()
+                    .map(shareeId -> new MailboxAndEmailChange(shareeId,
+                        EmailChange.builder()
+                            .accountId(shareeId)
+                            .state(stateFactory.generate())
+                            .date(now)
+                            .isDelegated(true)
+                            .updated(messageFlagUpdated.getMessageIds())
+                            .build(),
+                        MailboxChange.builder()
+                            .accountId(shareeId)
+                            .state(stateFactory.generate())
+                            .date(now)
+                            .isCountChange(true)
+                            .delegated(true)
+                            .updated(ImmutableList.of(messageFlagUpdated.getMailboxId()))
+                            .delegated()
+                            .build()));
+
+                return Stream.concat(Stream.of(ownerChange), shareeChanges)
+                    .collect(Guavate.toImmutableList());
+            }
+            Stream<EmailChange> shareeChanges = sharees.stream()
+                .map(shareeId -> EmailChange.builder()
+                    .accountId(shareeId)
+                    .state(stateFactory.generate())
+                    .date(now)
+                    .isDelegated(true)
+                    .updated(messageFlagUpdated.getMessageIds())
+                    .build());
+
+            return Stream.concat(Stream.of(ownerEmailChange), shareeChanges)
+                .collect(Guavate.toImmutableList());
+        }
+
+        public Flux<JmapChange> fromExpunged(MailboxEvents.Expunged expunged, ZonedDateTime now, List<Username> sharees) {
+            State state = stateFactory.generate();
+            boolean delegated = true;
+            Mono<JmapChange> ownerChange = fromExpunged(expunged, now, expunged.getUsername(), state, !delegated);
+
+            Flux<JmapChange> shareeChanges = Flux.fromIterable(sharees)
+                .concatMap(shareeId -> fromExpunged(expunged, now, shareeId, state, delegated));
+
+            return Flux.concat(ownerChange, shareeChanges);
+        }
+
+        private Mono<JmapChange> fromExpunged(MailboxEvents.Expunged expunged, ZonedDateTime now, Username username, State state, boolean delegated) {
+            AccountId accountId = AccountId.fromUsername(username);
+            MailboxChange mailboxChange = MailboxChange.builder()
+                .accountId(accountId)
+                .state(state)
+                .date(now)
+                .isCountChange(true)
+                .delegated(delegated)
+                .updated(ImmutableList.of(expunged.getMailboxId()))
+                .build();
+
+            return Mono.from(messageIdManager.accessibleMessagesReactive(expunged.getMessageIds(), sessionProvider.createSystemSession(username)))
+                .<JmapChange>map(accessibleMessageIds -> new MailboxAndEmailChange(accountId,
+                    EmailChange.builder()
+                        .accountId(AccountId.fromUsername(username))
+                        .state(state)
+                        .date(now)
+                        .isDelegated(delegated)
+                        .updated(Sets.intersection(ImmutableSet.copyOf(expunged.getMessageIds()), accessibleMessageIds))
+                        .destroyed(Sets.difference(ImmutableSet.copyOf(expunged.getMessageIds()), accessibleMessageIds))
+                        .build(), mailboxChange))
+                .switchIfEmpty(Mono.<JmapChange>just(mailboxChange));
+        }
+    }
+
+    private final AccountId accountId;
+    private final EmailChange emailChange;
+    private final MailboxChange mailboxChange;
+
+    public MailboxAndEmailChange(AccountId accountId, EmailChange emailChange, MailboxChange mailboxChange) {
+        Preconditions.checkArgument(accountId.equals(emailChange.getAccountId()));
+        Preconditions.checkArgument(accountId.equals(mailboxChange.getAccountId()));
+
+        this.accountId = accountId;
+        this.emailChange = emailChange;
+        this.mailboxChange = mailboxChange;
+    }
+
+    @Override
+    public AccountId getAccountId() {
+        return accountId;
+    }
+
+    public EmailChange getEmailChange() {
+        return emailChange;
+    }
+
+    public MailboxChange getMailboxChange() {
+        return mailboxChange;
+    }
+}
diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/MailboxChange.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/MailboxChange.java
index 3da4de7..1b20625 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/MailboxChange.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/MailboxChange.java
@@ -26,12 +26,8 @@ import java.util.Optional;
 import java.util.stream.Stream;
 
 import javax.inject.Inject;
-import javax.mail.Flags;
 
 import org.apache.james.jmap.api.model.AccountId;
-import org.apache.james.mailbox.events.MailboxEvents.Added;
-import org.apache.james.mailbox.events.MailboxEvents.Expunged;
-import org.apache.james.mailbox.events.MailboxEvents.FlagsUpdated;
 import org.apache.james.mailbox.events.MailboxEvents.MailboxACLUpdated;
 import org.apache.james.mailbox.events.MailboxEvents.MailboxAdded;
 import org.apache.james.mailbox.events.MailboxEvents.MailboxDeletion;
@@ -213,81 +209,6 @@ public class MailboxChange implements JmapChange {
             return Stream.concat(Stream.of(ownerChange), shareeChanges)
                 .collect(Guavate.toImmutableList());
         }
-
-        public List<JmapChange> fromAdded(Added messageAdded, ZonedDateTime now, List<AccountId> sharees) {
-            MailboxChange ownerChange = MailboxChange.builder()
-                .accountId(AccountId.fromUsername(messageAdded.getUsername()))
-                .state(stateFactory.generate())
-                .date(now)
-                .isCountChange(true)
-                .updated(ImmutableList.of(messageAdded.getMailboxId()))
-                .build();
-
-            Stream<MailboxChange> shareeChanges = sharees.stream()
-                .map(shareeId -> MailboxChange.builder()
-                    .accountId(shareeId)
-                    .state(stateFactory.generate())
-                    .date(now)
-                    .isCountChange(true)
-                    .updated(ImmutableList.of(messageAdded.getMailboxId()))
-                    .delegated()
-                    .build());
-
-            return Stream.concat(Stream.of(ownerChange), shareeChanges)
-                .collect(Guavate.toImmutableList());
-        }
-
-        public List<JmapChange> fromFlagsUpdated(FlagsUpdated messageFlagUpdated, ZonedDateTime now, List<AccountId> sharees) {
-            boolean isSeenChanged = messageFlagUpdated.getUpdatedFlags()
-                .stream()
-                .anyMatch(flags -> flags.isChanged(Flags.Flag.SEEN));
-            if (isSeenChanged) {
-                MailboxChange ownerChange = MailboxChange.builder()
-                    .accountId(AccountId.fromUsername(messageFlagUpdated.getUsername()))
-                    .state(stateFactory.generate())
-                    .date(now)
-                    .isCountChange(true)
-                    .updated(ImmutableList.of(messageFlagUpdated.getMailboxId()))
-                    .build();
-
-                Stream<MailboxChange> shareeChanges = sharees.stream()
-                    .map(shareeId -> MailboxChange.builder()
-                        .accountId(shareeId)
-                        .state(stateFactory.generate())
-                        .date(now)
-                        .isCountChange(true)
-                        .updated(ImmutableList.of(messageFlagUpdated.getMailboxId()))
-                        .delegated()
-                        .build());
-
-                return Stream.concat(Stream.of(ownerChange), shareeChanges)
-                    .collect(Guavate.toImmutableList());
-            }
-            return ImmutableList.of();
-        }
-
-        public List<JmapChange> fromExpunged(Expunged expunged, ZonedDateTime now, List<AccountId> sharees) {
-            MailboxChange ownerChange = MailboxChange.builder()
-                .accountId(AccountId.fromUsername(expunged.getUsername()))
-                .state(stateFactory.generate())
-                .date(now)
-                .isCountChange(true)
-                .updated(ImmutableList.of(expunged.getMailboxId()))
-                .build();
-
-            Stream<MailboxChange> shareeChanges = sharees.stream()
-                .map(shareeId -> MailboxChange.builder()
-                    .accountId(shareeId)
-                    .state(stateFactory.generate())
-                    .date(now)
-                    .isCountChange(true)
-                    .updated(ImmutableList.of(expunged.getMailboxId()))
-                    .delegated()
-                    .build());
-
-            return Stream.concat(Stream.of(ownerChange), shareeChanges)
-                .collect(Guavate.toImmutableList());
-        }
     }
 
     private final AccountId accountId;
diff --git a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketContract.scala b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketContract.scala
index 053b410..0223d8a 100644
--- a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketContract.scala
+++ b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketContract.scala
@@ -456,10 +456,6 @@ trait WebSocketContract {
               ws.receive()
                 .map { case t: Text =>
                   t.payload
-                },
-              ws.receive()
-                .map { case t: Text =>
-                  t.payload
                 })
         })
         .send(backend)
@@ -471,14 +467,12 @@ trait WebSocketContract {
     val emailState: State = jmapGuiceProbe.getLatestEmailState(accountId)
     val mailboxState: State = jmapGuiceProbe.getLatestMailboxState(accountId)
 
-    val globalState1: String = PushState.fromOption(Some(UuidState.fromJava(mailboxState)), None).get.value
-    val globalState2: String = PushState.fromOption(None, Some(UuidState.fromJava(emailState))).get.value
-    val mailboxStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState1"}"""
-    val emailStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"${emailState.getValue}"}},"pushState":"$globalState2"}"""
+    val globalState: String = PushState.fromOption(Some(UuidState.fromJava(mailboxState)), Some(UuidState.fromJava(emailState))).get.value
+    val stateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"${emailState.getValue}","Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState"}""".stripMargin
 
     assertThat(response.toOption.get.asJava)
-      .hasSize(3) // email notification + mailbox notification + API response
-      .contains(mailboxStateChange, emailStateChange)
+      .hasSize(2) // state change notification + API response
+      .contains(stateChange)
   }
 
   @Test
@@ -523,7 +517,7 @@ trait WebSocketContract {
             createEmail(ws)
             createEmail(ws)
 
-            List.range(0, 15)
+            List.range(0, 10)
               .map(i => ws.receive()
                 .map { case t: Text =>
                   t.payload
@@ -532,8 +526,8 @@ trait WebSocketContract {
         .send(backend)
         .body
 
-    // 5 changes, each one generate one response, one email state change, one mailbox state change
-    assertThat(response.toOption.get.asJava).hasSize(15)
+    // 5 changes, each one generate one response, one state change
+    assertThat(response.toOption.get.asJava).hasSize(10)
   }
 
   @Test
@@ -639,10 +633,6 @@ trait WebSocketContract {
               ws.receive()
                 .map { case t: Text =>
                   t.payload
-                },
-              ws.receive()
-                .map { case t: Text =>
-                  t.payload
                 })
         })
         .send(backend)
@@ -654,14 +644,12 @@ trait WebSocketContract {
     val mailboxState: State = jmapGuiceProbe.getLatestMailboxState(accountId)
     val emailState: State = jmapGuiceProbe.getLatestEmailState(accountId)
 
-    val globalState1: String = PushState.fromOption(Some(UuidState.fromJava(mailboxState)), None).get.value
-    val globalState2: String = PushState.fromOption(None, Some(UuidState.fromJava(emailState))).get.value
-    val mailboxStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState1"}"""
-    val emailStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"${emailState.getValue}","EmailDelivery":"${emailState.getValue}"}},"pushState":"$globalState2"}"""
+    val globalState: String = PushState.fromOption(Some(UuidState.fromJava(mailboxState)), Some(UuidState.fromJava(emailState))).get.value
+    val stateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"${emailState.getValue}","EmailDelivery":"${emailState.getValue}","Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState"}""".stripMargin
 
     assertThat(response.toOption.get.asJava)
-      .hasSize(3) // email notification + mailbox notification + API response
-      .contains(mailboxStateChange, emailStateChange)
+      .hasSize(2) // state change notification + API response
+      .contains(stateChange)
   }
 
   @Test
@@ -764,26 +752,21 @@ trait WebSocketContract {
               .map { case t: Text =>
                 t.payload
               }
-            val stateChange3 =
-              ws.receive()
-                .map { case t: Text =>
-                  t.payload
-                }
             val response2 =
               ws.receive()
                 .map { case t: Text =>
                   t.payload
                 }
 
-            List(response1, response2, stateChange1, stateChange2, stateChange3)
+            List(response1, response2, stateChange1, stateChange2)
         })
         .send(backend)
         .body
 
     assertThat(response.toOption.get.asJava)
-      .hasSize(5) // update flags response + email state change notif + destroy response + email state change notif + mailbox state change notif (count)
+      .hasSize(4) // update flags response + email state change notif + destroy response + email state change notif + mailbox state change notif (count)
     assertThat(response.toOption.get.filter(s => s.startsWith("{\"@type\":\"StateChange\"")).asJava)
-      .hasSize(3)
+      .hasSize(2)
       .noneMatch(s => s.contains("EmailDelivery"))
   }
 
@@ -834,10 +817,6 @@ trait WebSocketContract {
               ws.receive()
                 .map { case t: Text =>
                   t.payload
-                },
-              ws.receive()
-                .map { case t: Text =>
-                  t.payload
                 })
         })
         .send(backend)
@@ -849,14 +828,12 @@ trait WebSocketContract {
     val emailState: State = jmapGuiceProbe.getLatestEmailState(accountId)
     val mailboxState: State = jmapGuiceProbe.getLatestMailboxState(accountId)
 
-    val globalState1: String = PushState.fromOption(Some(UuidState.fromJava(mailboxState)), None).get.value
-    val globalState2: String = PushState.fromOption(None, Some(UuidState.fromJava(emailState))).get.value
-    val mailboxStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState1"}"""
-    val emailStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"${emailState.getValue}","EmailDelivery":"${emailState.getValue}"}},"pushState":"$globalState2"}"""
+    val globalState: String = PushState.fromOption(Some(UuidState.fromJava(mailboxState)), Some(UuidState.fromJava(emailState))).get.value
+    val stateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"${emailState.getValue}","EmailDelivery":"${emailState.getValue}","Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState"}""".stripMargin
 
     assertThat(response.toOption.get.asJava)
-      .hasSize(3) // email notification + mailbox notification + API response
-      .contains(mailboxStateChange, emailStateChange)
+      .hasSize(2) // state change notification + API response
+      .contains(stateChange)
   }
 
   @Test
@@ -922,15 +899,12 @@ trait WebSocketContract {
     val emailState: State = jmapGuiceProbe.getLatestEmailState(accountId)
     val mailboxState: State = jmapGuiceProbe.getLatestMailboxState(accountId)
 
-    val globalState1: String = PushState.fromOption(Some(UuidState.fromJava(mailboxState)), None).get.value
-    val globalState2: String = PushState.fromOption(None, Some(UuidState.fromJava(emailState))).get.value
-    val mailboxStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState1"}"""
-    val emailStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"${emailState.getValue}"}},"pushState":"$globalState2"}"""
+    val globalState: String = PushState.fromOption(Some(UuidState.fromJava(mailboxState)), Some(UuidState.fromJava(emailState))).get.value
+    val mailboxStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState"}"""
 
     assertThat(response.toOption.get.asJava)
-      .hasSize(2) // No Email notification
+      .hasSize(2) // Method response + Mailbox state change, no Email notification
       .contains(mailboxStateChange)
-      .doesNotContain(emailStateChange)
   }
 
   @Test
@@ -970,10 +944,6 @@ trait WebSocketContract {
               ws.receive()
                 .map { case t: Text =>
                   t.payload
-                },
-              ws.receive()
-                .map { case t: Text =>
-                  t.payload
                 })
         })
         .send(backend)
@@ -986,14 +956,12 @@ trait WebSocketContract {
     val emailState: State = jmapGuiceProbe.getLatestEmailStateWithDelegation(accountId)
     val mailboxState: State = jmapGuiceProbe.getLatestMailboxStateWithDelegation(accountId)
 
-    val globalState1: String = PushState.fromOption(Some(UuidState.fromJava(mailboxState)), None).get.value
-    val globalState2: String = PushState.fromOption(None, Some(UuidState.fromJava(emailState))).get.value
-    val mailboxStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState1"}"""
-    val emailStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"${emailState.getValue}"}},"pushState":"$globalState2"}"""
+    val globalState: String = PushState.fromOption(Some(UuidState.fromJava(mailboxState)), Some(UuidState.fromJava(emailState))).get.value
+    val stateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"${emailState.getValue}","Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState"}""".stripMargin
 
     assertThat(response.toOption.get.asJava)
-      .hasSize(2) // email notification + mailbox notification
-      .contains(mailboxStateChange, emailStateChange)
+      .hasSize(1)
+      .contains(stateChange)
   }
 
   @Test
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala
index e3cb5b9..0fd1501 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala
@@ -27,10 +27,10 @@ import org.apache.james.events.Event.EventId
 import org.apache.james.events.EventListener.ReactiveGroupEventListener
 import org.apache.james.events.{Event, EventBus, Group}
 import org.apache.james.jmap.InjectionKeys
-import org.apache.james.jmap.api.change.{EmailChange, EmailChangeRepository, JmapChange, MailboxChange, MailboxChangeRepository}
+import org.apache.james.jmap.api.change.{EmailChange, EmailChangeRepository, JmapChange, MailboxAndEmailChange, MailboxChange, MailboxChangeRepository}
 import org.apache.james.jmap.api.model.AccountId
 import org.apache.james.jmap.change.MailboxChangeListener.LOGGER
-import org.apache.james.jmap.core.UuidState
+import org.apache.james.jmap.core.{State, UuidState}
 import org.apache.james.mailbox.MailboxManager
 import org.apache.james.mailbox.events.MailboxEvents.{Added, Expunged, FlagsUpdated, MailboxACLUpdated, MailboxAdded, MailboxDeletion, MailboxEvent, MailboxRenamed}
 import org.apache.james.mailbox.model.{MailboxACL, MailboxId}
@@ -51,7 +51,7 @@ case class MailboxChangeListener @Inject() (@Named(InjectionKeys.JMAP) eventBus:
                                             mailboxChangeRepository: MailboxChangeRepository,
                                             mailboxChangeFactory: MailboxChange.Factory,
                                             emailChangeRepository: EmailChangeRepository,
-                                            emailChangeFactory: EmailChange.Factory,
+                                            emailChangeFactory: MailboxAndEmailChange.Factory,
                                             mailboxManager: MailboxManager,
                                             clock: Clock) extends ReactiveGroupEventListener {
 
@@ -82,17 +82,14 @@ case class MailboxChangeListener @Inject() (@Named(InjectionKeys.JMAP) eventBus:
         SFlux.fromIterable(mailboxChangeFactory.fromMailboxDeletion(mailboxDeletion, now).asScala)
       case added: Added =>
         getSharees(mailboxId, username)
-          .flatMapIterable(sharees => mailboxChangeFactory.fromAdded(added, now, sharees.asJava).asScala
-            .concat(emailChangeFactory.fromAdded(added, now, sharees.asJava).asScala))
+          .flatMapIterable(sharees => emailChangeFactory.fromAdded(added, now, sharees.asJava).asScala)
       case flagsUpdated: FlagsUpdated =>
         getSharees(mailboxId, username)
-          .flatMapIterable(sharees => mailboxChangeFactory.fromFlagsUpdated(flagsUpdated, now, sharees.asJava).asScala
-          .concat(emailChangeFactory.fromFlagsUpdated(flagsUpdated, now, sharees.asJava).asScala))
+          .flatMapIterable(sharees => emailChangeFactory.fromFlagsUpdated(flagsUpdated, now, sharees.asJava).asScala)
       case expunged: Expunged =>
         getSharees(mailboxId, username)
-          .flatMapMany(sharees => SFlux.concat(
-            SFlux.fromIterable(mailboxChangeFactory.fromExpunged(expunged, now, sharees.asJava).asScala),
-            emailChangeFactory.fromExpunged(expunged, now, sharees.map(_.getIdentifier).map(Username.of).asJava)))
+          .flatMapMany(sharees =>
+            SFlux(emailChangeFactory.fromExpunged(expunged, now, sharees.map(_.getIdentifier).map(Username.of).asJava)))
     }
   }
 
@@ -100,6 +97,8 @@ case class MailboxChangeListener @Inject() (@Named(InjectionKeys.JMAP) eventBus:
     SMono(jmapChange match {
       case mailboxChange: MailboxChange => mailboxChangeRepository.save(mailboxChange)
       case emailChange: EmailChange => emailChangeRepository.save(emailChange)
+      case mailboxAndEmailChange: MailboxAndEmailChange => mailboxChangeRepository.save(mailboxAndEmailChange.getMailboxChange)
+        .`then`(emailChangeRepository.save(mailboxAndEmailChange.getEmailChange))
     }).`then`(SMono(eventBus.dispatch(toStateChangeEvent(jmapChange), AccountIdRegistrationKey(jmapChange.getAccountId))))
 
   private def getSharees(mailboxId: MailboxId, username: Username): SMono[List[AccountId]] = {
@@ -123,14 +122,24 @@ case class MailboxChangeListener @Inject() (@Named(InjectionKeys.JMAP) eventBus:
     case emailChange: EmailChange => StateChangeEvent(
       eventId = EventId.random(),
       username = Username.of(emailChange.getAccountId.getIdentifier),
-      map = (Map(EmailTypeName -> UuidState.fromJava(emailChange.getState)) ++
-        Some(UuidState.fromJava(emailChange.getState))
-          .filter(_ => !emailChange.getCreated.isEmpty)
-          .map(emailDeliveryState => Map(EmailDeliveryTypeName -> emailDeliveryState))
-          .getOrElse(Map())).toMap)
+      map = emailStateMap(emailChange))
     case mailboxChange: MailboxChange => StateChangeEvent(
       eventId = EventId.random(),
       username = Username.of(mailboxChange.getAccountId.getIdentifier),
-      map = Map(MailboxTypeName -> UuidState.fromJava(mailboxChange.getState)))
+      map = mailboxStateMap(mailboxChange))
+    case mailboxAndEmailChange: MailboxAndEmailChange => StateChangeEvent(
+      eventId = EventId.random(),
+      username = Username.of(mailboxAndEmailChange.getAccountId.getIdentifier),
+      map = emailStateMap(mailboxAndEmailChange.getEmailChange) ++ mailboxStateMap(mailboxAndEmailChange.getMailboxChange))
   }
+
+  private def mailboxStateMap(mailboxChange: MailboxChange): Map[TypeName, State] =
+    Map(MailboxTypeName -> UuidState.fromJava(mailboxChange.getState))
+
+  private def emailStateMap(emailChange: EmailChange): Map[TypeName, State] =
+    (Map(EmailTypeName -> UuidState.fromJava(emailChange.getState)) ++
+      Some(UuidState.fromJava(emailChange.getState))
+        .filter(_ => !emailChange.getCreated.isEmpty)
+        .map(emailDeliveryState => Map(EmailDeliveryTypeName -> emailDeliveryState))
+        .getOrElse(Map())).toMap
 }
diff --git a/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/MailboxChangeListenerTest.scala b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/MailboxChangeListenerTest.scala
index f4c0f7d..af266a4 100644
--- a/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/MailboxChangeListenerTest.scala
+++ b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/MailboxChangeListenerTest.scala
@@ -25,7 +25,7 @@ import java.util
 import javax.mail.Flags
 import org.apache.james.events.delivery.InVmEventDelivery
 import org.apache.james.events.{Event, EventBus, EventListener, Group, InVMEventBus, MemoryEventDeadLetters, Registration, RegistrationKey, RetryBackoffConfiguration}
-import org.apache.james.jmap.api.change.{EmailChange, EmailChangeRepository, Limit, MailboxChange, MailboxChangeRepository, State}
+import org.apache.james.jmap.api.change.{EmailChange, EmailChangeRepository, Limit, MailboxAndEmailChange, MailboxChange, MailboxChangeRepository, State}
 import org.apache.james.jmap.api.model.AccountId
 import org.apache.james.jmap.change.MailboxChangeListenerTest.{ACCOUNT_ID, DEFAULT_NUMBER_OF_CHANGES}
 import org.apache.james.jmap.memory.change.{MemoryEmailChangeRepository, MemoryMailboxChangeRepository}
@@ -54,7 +54,7 @@ class MailboxChangeListenerTest {
   var mailboxManager: MailboxManager = _
   var mailboxChangeFactory: MailboxChange.Factory = _
   var emailChangeRepository: EmailChangeRepository = _
-  var emailChangeFactory: EmailChange.Factory = _
+  var emailChangeFactory: MailboxAndEmailChange.Factory = _
   var stateFactory: State.Factory = _
   var listener: MailboxChangeListener = _
   var clock: Clock = _
@@ -73,7 +73,7 @@ class MailboxChangeListenerTest {
     stateFactory = new State.DefaultFactory
     mailboxChangeFactory = new MailboxChange.Factory(stateFactory)
     mailboxChangeRepository = new MemoryMailboxChangeRepository(DEFAULT_NUMBER_OF_CHANGES)
-    emailChangeFactory = new EmailChange.Factory(stateFactory, resources.getMessageIdManager, resources.getMailboxManager)
+    emailChangeFactory = new MailboxAndEmailChange.Factory(stateFactory, resources.getMessageIdManager, resources.getMailboxManager)
     emailChangeRepository = new MemoryEmailChangeRepository(DEFAULT_NUMBER_OF_CHANGES)
     val eventBus = new EventBus {
       override def register(listener: EventListener.ReactiveEventListener, key: RegistrationKey): Publisher[Registration] = Mono.empty()

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org