You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2021/06/17 03:11:11 UTC
[james-project] branch master updated: JAMES-3491 Do not send two
JMAP events upon new messages
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
The following commit(s) were added to refs/heads/master by this push:
new 8379d2b JAMES-3491 Do not send two JMAP events upon new messages
8379d2b is described below
commit 8379d2bd5f63ed947fe194c99260fb612cc18f2f
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sat Jun 12 12:45:10 2021 +0700
JAMES-3491 Do not send two JMAP events upon new messages
Before we were firing two events on the JMAP event bus:
- One for emails
- One for mailbox
The following work aggregates both into a single notification
and thus reduces of around 30% RabbitMQ publish rate.
---
.../apache/james/jmap/api/change/EmailChange.java | 95 ---------
.../jmap/api/change/MailboxAndEmailChange.java | 220 +++++++++++++++++++++
.../james/jmap/api/change/MailboxChange.java | 79 --------
.../jmap/rfc8621/contract/WebSocketContract.scala | 82 +++-----
.../james/jmap/change/MailboxChangeListener.scala | 41 ++--
.../jmap/change/MailboxChangeListenerTest.scala | 6 +-
6 files changed, 273 insertions(+), 250 deletions(-)
diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/EmailChange.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/EmailChange.java
index 00e2733..e22d5c2 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/EmailChange.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/EmailChange.java
@@ -23,28 +23,13 @@ import java.time.ZonedDateTime;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
-import java.util.stream.Stream;
-import javax.inject.Inject;
-
-import org.apache.james.core.Username;
import org.apache.james.jmap.api.model.AccountId;
-import org.apache.james.mailbox.MessageIdManager;
-import org.apache.james.mailbox.SessionProvider;
-import org.apache.james.mailbox.events.MailboxEvents.Added;
-import org.apache.james.mailbox.events.MailboxEvents.Expunged;
-import org.apache.james.mailbox.events.MailboxEvents.FlagsUpdated;
import org.apache.james.mailbox.model.MessageId;
-import com.github.steveash.guavate.Guavate;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
-
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
public class EmailChange implements JmapChange {
public static class Builder {
@@ -129,86 +114,6 @@ public class EmailChange implements JmapChange {
return accountId -> state -> date -> isDelegated -> new Builder(accountId, state, date, isDelegated);
}
- public static class Factory {
- private final State.Factory stateFactory;
- private final MessageIdManager messageIdManager;
- private final SessionProvider sessionProvider;
-
- @Inject
- public Factory(State.Factory stateFactory, MessageIdManager messageIdManager, SessionProvider sessionProvider) {
- this.stateFactory = stateFactory;
- this.messageIdManager = messageIdManager;
- this.sessionProvider = sessionProvider;
- }
-
- public List<JmapChange> fromAdded(Added messageAdded, ZonedDateTime now, List<AccountId> sharees) {
- EmailChange ownerChange = EmailChange.builder()
- .accountId(AccountId.fromUsername(messageAdded.getUsername()))
- .state(stateFactory.generate())
- .date(now)
- .isDelegated(false)
- .created(messageAdded.getMessageIds())
- .build();
-
- Stream<EmailChange> shareeChanges = sharees.stream()
- .map(shareeId -> EmailChange.builder()
- .accountId(shareeId)
- .state(stateFactory.generate())
- .date(now)
- .isDelegated(true)
- .created(messageAdded.getMessageIds())
- .build());
-
- return Stream.concat(Stream.of(ownerChange), shareeChanges)
- .collect(Guavate.toImmutableList());
- }
-
- public List<JmapChange> fromFlagsUpdated(FlagsUpdated messageFlagUpdated, ZonedDateTime now, List<AccountId> sharees) {
- EmailChange ownerChange = EmailChange.builder()
- .accountId(AccountId.fromUsername(messageFlagUpdated.getUsername()))
- .state(stateFactory.generate())
- .date(now)
- .isDelegated(false)
- .updated(messageFlagUpdated.getMessageIds())
- .build();
-
- Stream<EmailChange> shareeChanges = sharees.stream()
- .map(shareeId -> EmailChange.builder()
- .accountId(shareeId)
- .state(stateFactory.generate())
- .date(now)
- .isDelegated(true)
- .updated(messageFlagUpdated.getMessageIds())
- .build());
-
- return Stream.concat(Stream.of(ownerChange), shareeChanges)
- .collect(Guavate.toImmutableList());
- }
-
- public Flux<JmapChange> fromExpunged(Expunged expunged, ZonedDateTime now, List<Username> sharees) {
-
- Mono<EmailChange> ownerChange = fromExpunged(expunged, now, expunged.getUsername());
-
- Flux<EmailChange> shareeChanges = Flux.fromIterable(sharees)
- .flatMap(shareeId -> fromExpunged(expunged, now, shareeId));
-
- return Flux.concat(ownerChange, shareeChanges);
- }
-
- private Mono<EmailChange> fromExpunged(Expunged expunged, ZonedDateTime now, Username username) {
- return Mono.from(messageIdManager.accessibleMessagesReactive(expunged.getMessageIds(),
- sessionProvider.createSystemSession(username)))
- .map(accessibleMessageIds -> EmailChange.builder()
- .accountId(AccountId.fromUsername(username))
- .state(stateFactory.generate())
- .date(now)
- .isDelegated(false)
- .updated(Sets.intersection(ImmutableSet.copyOf(expunged.getMessageIds()), accessibleMessageIds))
- .destroyed(Sets.difference(ImmutableSet.copyOf(expunged.getMessageIds()), accessibleMessageIds))
- .build());
- }
- }
-
private final AccountId accountId;
private final State state;
private final ZonedDateTime date;
diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/MailboxAndEmailChange.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/MailboxAndEmailChange.java
new file mode 100644
index 0000000..5256641
--- /dev/null
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/MailboxAndEmailChange.java
@@ -0,0 +1,220 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.jmap.api.change;
+
+import java.time.ZonedDateTime;
+import java.util.List;
+import java.util.stream.Stream;
+
+import javax.inject.Inject;
+import javax.mail.Flags;
+
+import org.apache.james.core.Username;
+import org.apache.james.jmap.api.model.AccountId;
+import org.apache.james.mailbox.MessageIdManager;
+import org.apache.james.mailbox.SessionProvider;
+import org.apache.james.mailbox.events.MailboxEvents;
+
+import com.github.steveash.guavate.Guavate;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class MailboxAndEmailChange implements JmapChange {
+ public static class Factory {
+ private final State.Factory stateFactory;
+ private final MessageIdManager messageIdManager;
+ private final SessionProvider sessionProvider;
+
+ @Inject
+ public Factory(State.Factory stateFactory, MessageIdManager messageIdManager, SessionProvider sessionProvider) {
+ this.stateFactory = stateFactory;
+ this.messageIdManager = messageIdManager;
+ this.sessionProvider = sessionProvider;
+ }
+
+ public List<JmapChange> fromAdded(MailboxEvents.Added messageAdded, ZonedDateTime now, List<AccountId> sharees) {
+ AccountId accountId = AccountId.fromUsername(messageAdded.getUsername());
+ State state = stateFactory.generate();
+ EmailChange ownerEmailChange = EmailChange.builder()
+ .accountId(accountId)
+ .state(state)
+ .date(now)
+ .isDelegated(false)
+ .created(messageAdded.getMessageIds())
+ .build();
+
+ MailboxChange ownerMailboxChange = MailboxChange.builder()
+ .accountId(AccountId.fromUsername(messageAdded.getUsername()))
+ .state(state)
+ .date(now)
+ .isCountChange(true)
+ .delegated(false)
+ .updated(ImmutableList.of(messageAdded.getMailboxId()))
+ .build();
+
+ MailboxAndEmailChange ownerChange = new MailboxAndEmailChange(accountId, ownerEmailChange, ownerMailboxChange);
+
+ Stream<MailboxAndEmailChange> shareeChanges = sharees.stream()
+ .map(shareeId -> new MailboxAndEmailChange(shareeId,
+ EmailChange.builder()
+ .accountId(shareeId)
+ .state(state)
+ .date(now)
+ .isDelegated(true)
+ .created(messageAdded.getMessageIds())
+ .build(),
+ MailboxChange.builder()
+ .accountId(shareeId)
+ .state(state)
+ .date(now)
+ .isCountChange(true)
+ .delegated(true)
+ .updated(ImmutableList.of(messageAdded.getMailboxId()))
+ .build()));
+
+ return Stream.concat(Stream.of(ownerChange), shareeChanges)
+ .collect(Guavate.toImmutableList());
+ }
+
+ public List<JmapChange> fromFlagsUpdated(MailboxEvents.FlagsUpdated messageFlagUpdated, ZonedDateTime now, List<AccountId> sharees) {
+ boolean isSeenChanged = messageFlagUpdated.getUpdatedFlags()
+ .stream()
+ .anyMatch(flags -> flags.isChanged(Flags.Flag.SEEN));
+ AccountId accountId = AccountId.fromUsername(messageFlagUpdated.getUsername());
+ EmailChange ownerEmailChange = EmailChange.builder()
+ .accountId(accountId)
+ .state(stateFactory.generate())
+ .date(now)
+ .isDelegated(false)
+ .updated(messageFlagUpdated.getMessageIds())
+ .build();
+
+ if (isSeenChanged) {
+ MailboxChange ownerMailboxChange = MailboxChange.builder()
+ .accountId(AccountId.fromUsername(messageFlagUpdated.getUsername()))
+ .state(stateFactory.generate())
+ .date(now)
+ .isCountChange(true)
+ .updated(ImmutableList.of(messageFlagUpdated.getMailboxId()))
+ .build();
+ MailboxAndEmailChange ownerChange = new MailboxAndEmailChange(accountId, ownerEmailChange, ownerMailboxChange);
+
+ Stream<MailboxAndEmailChange> shareeChanges = sharees.stream()
+ .map(shareeId -> new MailboxAndEmailChange(shareeId,
+ EmailChange.builder()
+ .accountId(shareeId)
+ .state(stateFactory.generate())
+ .date(now)
+ .isDelegated(true)
+ .updated(messageFlagUpdated.getMessageIds())
+ .build(),
+ MailboxChange.builder()
+ .accountId(shareeId)
+ .state(stateFactory.generate())
+ .date(now)
+ .isCountChange(true)
+ .delegated(true)
+ .updated(ImmutableList.of(messageFlagUpdated.getMailboxId()))
+ .delegated()
+ .build()));
+
+ return Stream.concat(Stream.of(ownerChange), shareeChanges)
+ .collect(Guavate.toImmutableList());
+ }
+ Stream<EmailChange> shareeChanges = sharees.stream()
+ .map(shareeId -> EmailChange.builder()
+ .accountId(shareeId)
+ .state(stateFactory.generate())
+ .date(now)
+ .isDelegated(true)
+ .updated(messageFlagUpdated.getMessageIds())
+ .build());
+
+ return Stream.concat(Stream.of(ownerEmailChange), shareeChanges)
+ .collect(Guavate.toImmutableList());
+ }
+
+ public Flux<JmapChange> fromExpunged(MailboxEvents.Expunged expunged, ZonedDateTime now, List<Username> sharees) {
+ State state = stateFactory.generate();
+ boolean delegated = true;
+ Mono<JmapChange> ownerChange = fromExpunged(expunged, now, expunged.getUsername(), state, !delegated);
+
+ Flux<JmapChange> shareeChanges = Flux.fromIterable(sharees)
+ .concatMap(shareeId -> fromExpunged(expunged, now, shareeId, state, delegated));
+
+ return Flux.concat(ownerChange, shareeChanges);
+ }
+
+ private Mono<JmapChange> fromExpunged(MailboxEvents.Expunged expunged, ZonedDateTime now, Username username, State state, boolean delegated) {
+ AccountId accountId = AccountId.fromUsername(username);
+ MailboxChange mailboxChange = MailboxChange.builder()
+ .accountId(accountId)
+ .state(state)
+ .date(now)
+ .isCountChange(true)
+ .delegated(delegated)
+ .updated(ImmutableList.of(expunged.getMailboxId()))
+ .build();
+
+ return Mono.from(messageIdManager.accessibleMessagesReactive(expunged.getMessageIds(), sessionProvider.createSystemSession(username)))
+ .<JmapChange>map(accessibleMessageIds -> new MailboxAndEmailChange(accountId,
+ EmailChange.builder()
+ .accountId(AccountId.fromUsername(username))
+ .state(state)
+ .date(now)
+ .isDelegated(delegated)
+ .updated(Sets.intersection(ImmutableSet.copyOf(expunged.getMessageIds()), accessibleMessageIds))
+ .destroyed(Sets.difference(ImmutableSet.copyOf(expunged.getMessageIds()), accessibleMessageIds))
+ .build(), mailboxChange))
+ .switchIfEmpty(Mono.<JmapChange>just(mailboxChange));
+ }
+ }
+
+ private final AccountId accountId;
+ private final EmailChange emailChange;
+ private final MailboxChange mailboxChange;
+
+ public MailboxAndEmailChange(AccountId accountId, EmailChange emailChange, MailboxChange mailboxChange) {
+ Preconditions.checkArgument(accountId.equals(emailChange.getAccountId()));
+ Preconditions.checkArgument(accountId.equals(mailboxChange.getAccountId()));
+
+ this.accountId = accountId;
+ this.emailChange = emailChange;
+ this.mailboxChange = mailboxChange;
+ }
+
+ @Override
+ public AccountId getAccountId() {
+ return accountId;
+ }
+
+ public EmailChange getEmailChange() {
+ return emailChange;
+ }
+
+ public MailboxChange getMailboxChange() {
+ return mailboxChange;
+ }
+}
diff --git a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/MailboxChange.java b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/MailboxChange.java
index 3da4de7..1b20625 100644
--- a/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/MailboxChange.java
+++ b/server/data/data-jmap/src/main/java/org/apache/james/jmap/api/change/MailboxChange.java
@@ -26,12 +26,8 @@ import java.util.Optional;
import java.util.stream.Stream;
import javax.inject.Inject;
-import javax.mail.Flags;
import org.apache.james.jmap.api.model.AccountId;
-import org.apache.james.mailbox.events.MailboxEvents.Added;
-import org.apache.james.mailbox.events.MailboxEvents.Expunged;
-import org.apache.james.mailbox.events.MailboxEvents.FlagsUpdated;
import org.apache.james.mailbox.events.MailboxEvents.MailboxACLUpdated;
import org.apache.james.mailbox.events.MailboxEvents.MailboxAdded;
import org.apache.james.mailbox.events.MailboxEvents.MailboxDeletion;
@@ -213,81 +209,6 @@ public class MailboxChange implements JmapChange {
return Stream.concat(Stream.of(ownerChange), shareeChanges)
.collect(Guavate.toImmutableList());
}
-
- public List<JmapChange> fromAdded(Added messageAdded, ZonedDateTime now, List<AccountId> sharees) {
- MailboxChange ownerChange = MailboxChange.builder()
- .accountId(AccountId.fromUsername(messageAdded.getUsername()))
- .state(stateFactory.generate())
- .date(now)
- .isCountChange(true)
- .updated(ImmutableList.of(messageAdded.getMailboxId()))
- .build();
-
- Stream<MailboxChange> shareeChanges = sharees.stream()
- .map(shareeId -> MailboxChange.builder()
- .accountId(shareeId)
- .state(stateFactory.generate())
- .date(now)
- .isCountChange(true)
- .updated(ImmutableList.of(messageAdded.getMailboxId()))
- .delegated()
- .build());
-
- return Stream.concat(Stream.of(ownerChange), shareeChanges)
- .collect(Guavate.toImmutableList());
- }
-
- public List<JmapChange> fromFlagsUpdated(FlagsUpdated messageFlagUpdated, ZonedDateTime now, List<AccountId> sharees) {
- boolean isSeenChanged = messageFlagUpdated.getUpdatedFlags()
- .stream()
- .anyMatch(flags -> flags.isChanged(Flags.Flag.SEEN));
- if (isSeenChanged) {
- MailboxChange ownerChange = MailboxChange.builder()
- .accountId(AccountId.fromUsername(messageFlagUpdated.getUsername()))
- .state(stateFactory.generate())
- .date(now)
- .isCountChange(true)
- .updated(ImmutableList.of(messageFlagUpdated.getMailboxId()))
- .build();
-
- Stream<MailboxChange> shareeChanges = sharees.stream()
- .map(shareeId -> MailboxChange.builder()
- .accountId(shareeId)
- .state(stateFactory.generate())
- .date(now)
- .isCountChange(true)
- .updated(ImmutableList.of(messageFlagUpdated.getMailboxId()))
- .delegated()
- .build());
-
- return Stream.concat(Stream.of(ownerChange), shareeChanges)
- .collect(Guavate.toImmutableList());
- }
- return ImmutableList.of();
- }
-
- public List<JmapChange> fromExpunged(Expunged expunged, ZonedDateTime now, List<AccountId> sharees) {
- MailboxChange ownerChange = MailboxChange.builder()
- .accountId(AccountId.fromUsername(expunged.getUsername()))
- .state(stateFactory.generate())
- .date(now)
- .isCountChange(true)
- .updated(ImmutableList.of(expunged.getMailboxId()))
- .build();
-
- Stream<MailboxChange> shareeChanges = sharees.stream()
- .map(shareeId -> MailboxChange.builder()
- .accountId(shareeId)
- .state(stateFactory.generate())
- .date(now)
- .isCountChange(true)
- .updated(ImmutableList.of(expunged.getMailboxId()))
- .delegated()
- .build());
-
- return Stream.concat(Stream.of(ownerChange), shareeChanges)
- .collect(Guavate.toImmutableList());
- }
}
private final AccountId accountId;
diff --git a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketContract.scala b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketContract.scala
index 053b410..0223d8a 100644
--- a/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketContract.scala
+++ b/server/protocols/jmap-rfc-8621-integration-tests/jmap-rfc-8621-integration-tests-common/src/main/scala/org/apache/james/jmap/rfc8621/contract/WebSocketContract.scala
@@ -456,10 +456,6 @@ trait WebSocketContract {
ws.receive()
.map { case t: Text =>
t.payload
- },
- ws.receive()
- .map { case t: Text =>
- t.payload
})
})
.send(backend)
@@ -471,14 +467,12 @@ trait WebSocketContract {
val emailState: State = jmapGuiceProbe.getLatestEmailState(accountId)
val mailboxState: State = jmapGuiceProbe.getLatestMailboxState(accountId)
- val globalState1: String = PushState.fromOption(Some(UuidState.fromJava(mailboxState)), None).get.value
- val globalState2: String = PushState.fromOption(None, Some(UuidState.fromJava(emailState))).get.value
- val mailboxStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState1"}"""
- val emailStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"${emailState.getValue}"}},"pushState":"$globalState2"}"""
+ val globalState: String = PushState.fromOption(Some(UuidState.fromJava(mailboxState)), Some(UuidState.fromJava(emailState))).get.value
+ val stateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"${emailState.getValue}","Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState"}""".stripMargin
assertThat(response.toOption.get.asJava)
- .hasSize(3) // email notification + mailbox notification + API response
- .contains(mailboxStateChange, emailStateChange)
+ .hasSize(2) // state change notification + API response
+ .contains(stateChange)
}
@Test
@@ -523,7 +517,7 @@ trait WebSocketContract {
createEmail(ws)
createEmail(ws)
- List.range(0, 15)
+ List.range(0, 10)
.map(i => ws.receive()
.map { case t: Text =>
t.payload
@@ -532,8 +526,8 @@ trait WebSocketContract {
.send(backend)
.body
- // 5 changes, each one generate one response, one email state change, one mailbox state change
- assertThat(response.toOption.get.asJava).hasSize(15)
+ // 5 changes, each one generate one response, one state change
+ assertThat(response.toOption.get.asJava).hasSize(10)
}
@Test
@@ -639,10 +633,6 @@ trait WebSocketContract {
ws.receive()
.map { case t: Text =>
t.payload
- },
- ws.receive()
- .map { case t: Text =>
- t.payload
})
})
.send(backend)
@@ -654,14 +644,12 @@ trait WebSocketContract {
val mailboxState: State = jmapGuiceProbe.getLatestMailboxState(accountId)
val emailState: State = jmapGuiceProbe.getLatestEmailState(accountId)
- val globalState1: String = PushState.fromOption(Some(UuidState.fromJava(mailboxState)), None).get.value
- val globalState2: String = PushState.fromOption(None, Some(UuidState.fromJava(emailState))).get.value
- val mailboxStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState1"}"""
- val emailStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"${emailState.getValue}","EmailDelivery":"${emailState.getValue}"}},"pushState":"$globalState2"}"""
+ val globalState: String = PushState.fromOption(Some(UuidState.fromJava(mailboxState)), Some(UuidState.fromJava(emailState))).get.value
+ val stateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"${emailState.getValue}","EmailDelivery":"${emailState.getValue}","Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState"}""".stripMargin
assertThat(response.toOption.get.asJava)
- .hasSize(3) // email notification + mailbox notification + API response
- .contains(mailboxStateChange, emailStateChange)
+ .hasSize(2) // state change notification + API response
+ .contains(stateChange)
}
@Test
@@ -764,26 +752,21 @@ trait WebSocketContract {
.map { case t: Text =>
t.payload
}
- val stateChange3 =
- ws.receive()
- .map { case t: Text =>
- t.payload
- }
val response2 =
ws.receive()
.map { case t: Text =>
t.payload
}
- List(response1, response2, stateChange1, stateChange2, stateChange3)
+ List(response1, response2, stateChange1, stateChange2)
})
.send(backend)
.body
assertThat(response.toOption.get.asJava)
- .hasSize(5) // update flags response + email state change notif + destroy response + email state change notif + mailbox state change notif (count)
+ .hasSize(4) // update flags response + email state change notif + destroy response + email state change notif + mailbox state change notif (count)
assertThat(response.toOption.get.filter(s => s.startsWith("{\"@type\":\"StateChange\"")).asJava)
- .hasSize(3)
+ .hasSize(2)
.noneMatch(s => s.contains("EmailDelivery"))
}
@@ -834,10 +817,6 @@ trait WebSocketContract {
ws.receive()
.map { case t: Text =>
t.payload
- },
- ws.receive()
- .map { case t: Text =>
- t.payload
})
})
.send(backend)
@@ -849,14 +828,12 @@ trait WebSocketContract {
val emailState: State = jmapGuiceProbe.getLatestEmailState(accountId)
val mailboxState: State = jmapGuiceProbe.getLatestMailboxState(accountId)
- val globalState1: String = PushState.fromOption(Some(UuidState.fromJava(mailboxState)), None).get.value
- val globalState2: String = PushState.fromOption(None, Some(UuidState.fromJava(emailState))).get.value
- val mailboxStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState1"}"""
- val emailStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"${emailState.getValue}","EmailDelivery":"${emailState.getValue}"}},"pushState":"$globalState2"}"""
+ val globalState: String = PushState.fromOption(Some(UuidState.fromJava(mailboxState)), Some(UuidState.fromJava(emailState))).get.value
+ val stateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"${emailState.getValue}","EmailDelivery":"${emailState.getValue}","Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState"}""".stripMargin
assertThat(response.toOption.get.asJava)
- .hasSize(3) // email notification + mailbox notification + API response
- .contains(mailboxStateChange, emailStateChange)
+ .hasSize(2) // state change notification + API response
+ .contains(stateChange)
}
@Test
@@ -922,15 +899,12 @@ trait WebSocketContract {
val emailState: State = jmapGuiceProbe.getLatestEmailState(accountId)
val mailboxState: State = jmapGuiceProbe.getLatestMailboxState(accountId)
- val globalState1: String = PushState.fromOption(Some(UuidState.fromJava(mailboxState)), None).get.value
- val globalState2: String = PushState.fromOption(None, Some(UuidState.fromJava(emailState))).get.value
- val mailboxStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState1"}"""
- val emailStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"${emailState.getValue}"}},"pushState":"$globalState2"}"""
+ val globalState: String = PushState.fromOption(Some(UuidState.fromJava(mailboxState)), Some(UuidState.fromJava(emailState))).get.value
+ val mailboxStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState"}"""
assertThat(response.toOption.get.asJava)
- .hasSize(2) // No Email notification
+ .hasSize(2) // Method response + Mailbox state change, no Email notification
.contains(mailboxStateChange)
- .doesNotContain(emailStateChange)
}
@Test
@@ -970,10 +944,6 @@ trait WebSocketContract {
ws.receive()
.map { case t: Text =>
t.payload
- },
- ws.receive()
- .map { case t: Text =>
- t.payload
})
})
.send(backend)
@@ -986,14 +956,12 @@ trait WebSocketContract {
val emailState: State = jmapGuiceProbe.getLatestEmailStateWithDelegation(accountId)
val mailboxState: State = jmapGuiceProbe.getLatestMailboxStateWithDelegation(accountId)
- val globalState1: String = PushState.fromOption(Some(UuidState.fromJava(mailboxState)), None).get.value
- val globalState2: String = PushState.fromOption(None, Some(UuidState.fromJava(emailState))).get.value
- val mailboxStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState1"}"""
- val emailStateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"${emailState.getValue}"}},"pushState":"$globalState2"}"""
+ val globalState: String = PushState.fromOption(Some(UuidState.fromJava(mailboxState)), Some(UuidState.fromJava(emailState))).get.value
+ val stateChange: String = s"""{"@type":"StateChange","changed":{"$ACCOUNT_ID":{"Email":"${emailState.getValue}","Mailbox":"${mailboxState.getValue}"}},"pushState":"$globalState"}""".stripMargin
assertThat(response.toOption.get.asJava)
- .hasSize(2) // email notification + mailbox notification
- .contains(mailboxStateChange, emailStateChange)
+ .hasSize(1)
+ .contains(stateChange)
}
@Test
diff --git a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala
index e3cb5b9..0fd1501 100644
--- a/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala
+++ b/server/protocols/jmap-rfc-8621/src/main/scala/org/apache/james/jmap/change/MailboxChangeListener.scala
@@ -27,10 +27,10 @@ import org.apache.james.events.Event.EventId
import org.apache.james.events.EventListener.ReactiveGroupEventListener
import org.apache.james.events.{Event, EventBus, Group}
import org.apache.james.jmap.InjectionKeys
-import org.apache.james.jmap.api.change.{EmailChange, EmailChangeRepository, JmapChange, MailboxChange, MailboxChangeRepository}
+import org.apache.james.jmap.api.change.{EmailChange, EmailChangeRepository, JmapChange, MailboxAndEmailChange, MailboxChange, MailboxChangeRepository}
import org.apache.james.jmap.api.model.AccountId
import org.apache.james.jmap.change.MailboxChangeListener.LOGGER
-import org.apache.james.jmap.core.UuidState
+import org.apache.james.jmap.core.{State, UuidState}
import org.apache.james.mailbox.MailboxManager
import org.apache.james.mailbox.events.MailboxEvents.{Added, Expunged, FlagsUpdated, MailboxACLUpdated, MailboxAdded, MailboxDeletion, MailboxEvent, MailboxRenamed}
import org.apache.james.mailbox.model.{MailboxACL, MailboxId}
@@ -51,7 +51,7 @@ case class MailboxChangeListener @Inject() (@Named(InjectionKeys.JMAP) eventBus:
mailboxChangeRepository: MailboxChangeRepository,
mailboxChangeFactory: MailboxChange.Factory,
emailChangeRepository: EmailChangeRepository,
- emailChangeFactory: EmailChange.Factory,
+ emailChangeFactory: MailboxAndEmailChange.Factory,
mailboxManager: MailboxManager,
clock: Clock) extends ReactiveGroupEventListener {
@@ -82,17 +82,14 @@ case class MailboxChangeListener @Inject() (@Named(InjectionKeys.JMAP) eventBus:
SFlux.fromIterable(mailboxChangeFactory.fromMailboxDeletion(mailboxDeletion, now).asScala)
case added: Added =>
getSharees(mailboxId, username)
- .flatMapIterable(sharees => mailboxChangeFactory.fromAdded(added, now, sharees.asJava).asScala
- .concat(emailChangeFactory.fromAdded(added, now, sharees.asJava).asScala))
+ .flatMapIterable(sharees => emailChangeFactory.fromAdded(added, now, sharees.asJava).asScala)
case flagsUpdated: FlagsUpdated =>
getSharees(mailboxId, username)
- .flatMapIterable(sharees => mailboxChangeFactory.fromFlagsUpdated(flagsUpdated, now, sharees.asJava).asScala
- .concat(emailChangeFactory.fromFlagsUpdated(flagsUpdated, now, sharees.asJava).asScala))
+ .flatMapIterable(sharees => emailChangeFactory.fromFlagsUpdated(flagsUpdated, now, sharees.asJava).asScala)
case expunged: Expunged =>
getSharees(mailboxId, username)
- .flatMapMany(sharees => SFlux.concat(
- SFlux.fromIterable(mailboxChangeFactory.fromExpunged(expunged, now, sharees.asJava).asScala),
- emailChangeFactory.fromExpunged(expunged, now, sharees.map(_.getIdentifier).map(Username.of).asJava)))
+ .flatMapMany(sharees =>
+ SFlux(emailChangeFactory.fromExpunged(expunged, now, sharees.map(_.getIdentifier).map(Username.of).asJava)))
}
}
@@ -100,6 +97,8 @@ case class MailboxChangeListener @Inject() (@Named(InjectionKeys.JMAP) eventBus:
SMono(jmapChange match {
case mailboxChange: MailboxChange => mailboxChangeRepository.save(mailboxChange)
case emailChange: EmailChange => emailChangeRepository.save(emailChange)
+ case mailboxAndEmailChange: MailboxAndEmailChange => mailboxChangeRepository.save(mailboxAndEmailChange.getMailboxChange)
+ .`then`(emailChangeRepository.save(mailboxAndEmailChange.getEmailChange))
}).`then`(SMono(eventBus.dispatch(toStateChangeEvent(jmapChange), AccountIdRegistrationKey(jmapChange.getAccountId))))
private def getSharees(mailboxId: MailboxId, username: Username): SMono[List[AccountId]] = {
@@ -123,14 +122,24 @@ case class MailboxChangeListener @Inject() (@Named(InjectionKeys.JMAP) eventBus:
case emailChange: EmailChange => StateChangeEvent(
eventId = EventId.random(),
username = Username.of(emailChange.getAccountId.getIdentifier),
- map = (Map(EmailTypeName -> UuidState.fromJava(emailChange.getState)) ++
- Some(UuidState.fromJava(emailChange.getState))
- .filter(_ => !emailChange.getCreated.isEmpty)
- .map(emailDeliveryState => Map(EmailDeliveryTypeName -> emailDeliveryState))
- .getOrElse(Map())).toMap)
+ map = emailStateMap(emailChange))
case mailboxChange: MailboxChange => StateChangeEvent(
eventId = EventId.random(),
username = Username.of(mailboxChange.getAccountId.getIdentifier),
- map = Map(MailboxTypeName -> UuidState.fromJava(mailboxChange.getState)))
+ map = mailboxStateMap(mailboxChange))
+ case mailboxAndEmailChange: MailboxAndEmailChange => StateChangeEvent(
+ eventId = EventId.random(),
+ username = Username.of(mailboxAndEmailChange.getAccountId.getIdentifier),
+ map = emailStateMap(mailboxAndEmailChange.getEmailChange) ++ mailboxStateMap(mailboxAndEmailChange.getMailboxChange))
}
+
+ private def mailboxStateMap(mailboxChange: MailboxChange): Map[TypeName, State] =
+ Map(MailboxTypeName -> UuidState.fromJava(mailboxChange.getState))
+
+ private def emailStateMap(emailChange: EmailChange): Map[TypeName, State] =
+ (Map(EmailTypeName -> UuidState.fromJava(emailChange.getState)) ++
+ Some(UuidState.fromJava(emailChange.getState))
+ .filter(_ => !emailChange.getCreated.isEmpty)
+ .map(emailDeliveryState => Map(EmailDeliveryTypeName -> emailDeliveryState))
+ .getOrElse(Map())).toMap
}
diff --git a/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/MailboxChangeListenerTest.scala b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/MailboxChangeListenerTest.scala
index f4c0f7d..af266a4 100644
--- a/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/MailboxChangeListenerTest.scala
+++ b/server/protocols/jmap-rfc-8621/src/test/scala/org/apache/james/jmap/change/MailboxChangeListenerTest.scala
@@ -25,7 +25,7 @@ import java.util
import javax.mail.Flags
import org.apache.james.events.delivery.InVmEventDelivery
import org.apache.james.events.{Event, EventBus, EventListener, Group, InVMEventBus, MemoryEventDeadLetters, Registration, RegistrationKey, RetryBackoffConfiguration}
-import org.apache.james.jmap.api.change.{EmailChange, EmailChangeRepository, Limit, MailboxChange, MailboxChangeRepository, State}
+import org.apache.james.jmap.api.change.{EmailChange, EmailChangeRepository, Limit, MailboxAndEmailChange, MailboxChange, MailboxChangeRepository, State}
import org.apache.james.jmap.api.model.AccountId
import org.apache.james.jmap.change.MailboxChangeListenerTest.{ACCOUNT_ID, DEFAULT_NUMBER_OF_CHANGES}
import org.apache.james.jmap.memory.change.{MemoryEmailChangeRepository, MemoryMailboxChangeRepository}
@@ -54,7 +54,7 @@ class MailboxChangeListenerTest {
var mailboxManager: MailboxManager = _
var mailboxChangeFactory: MailboxChange.Factory = _
var emailChangeRepository: EmailChangeRepository = _
- var emailChangeFactory: EmailChange.Factory = _
+ var emailChangeFactory: MailboxAndEmailChange.Factory = _
var stateFactory: State.Factory = _
var listener: MailboxChangeListener = _
var clock: Clock = _
@@ -73,7 +73,7 @@ class MailboxChangeListenerTest {
stateFactory = new State.DefaultFactory
mailboxChangeFactory = new MailboxChange.Factory(stateFactory)
mailboxChangeRepository = new MemoryMailboxChangeRepository(DEFAULT_NUMBER_OF_CHANGES)
- emailChangeFactory = new EmailChange.Factory(stateFactory, resources.getMessageIdManager, resources.getMailboxManager)
+ emailChangeFactory = new MailboxAndEmailChange.Factory(stateFactory, resources.getMessageIdManager, resources.getMailboxManager)
emailChangeRepository = new MemoryEmailChangeRepository(DEFAULT_NUMBER_OF_CHANGES)
val eventBus = new EventBus {
override def register(listener: EventListener.ReactiveEventListener, key: RegistrationKey): Publisher[Registration] = Mono.empty()
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org