You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/04/03 02:09:47 UTC
[james-project] 04/04: JAMES-3130 MessageIdMapper::setFlags should
return all results
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 4900a05607e7005654bc1347c3e0e6c700a155a3
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Mar 31 17:00:24 2020 +0700
JAMES-3130 MessageIdMapper::setFlags should return all results
When a message (identified by its messageId) is stored several time in
the same mailbox
---
.../cassandra/mail/CassandraMessageIdMapper.java | 22 ++++-----
.../inmemory/mail/InMemoryMessageIdMapper.java | 16 ++-----
.../james/mailbox/store/StoreMessageIdManager.java | 13 +++---
.../james/mailbox/store/mail/MessageIdMapper.java | 12 ++++-
.../store/mail/model/MessageIdMapperTest.java | 54 ++++++++++++++--------
5 files changed, 67 insertions(+), 50 deletions(-)
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index 3aeb060..0eeca22 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -21,9 +21,7 @@ package org.apache.james.mailbox.cassandra.mail;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
-import java.util.function.BiFunction;
import javax.mail.Flags;
@@ -59,7 +57,6 @@ import reactor.core.scheduler.Schedulers;
public class CassandraMessageIdMapper implements MessageIdMapper {
private static final Logger LOGGER = LoggerFactory.getLogger(CassandraMessageIdMapper.class);
- public static final BiFunction<UpdatedFlags, UpdatedFlags, UpdatedFlags> KEEP_FIRST = (a, b) -> a;
private final MailboxMapper mailboxMapper;
private final CassandraMailboxDAO mailboxDAO;
@@ -207,14 +204,14 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
}
@Override
- public Map<MailboxId, UpdatedFlags> setFlags(MessageId messageId, List<MailboxId> mailboxIds, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException {
+ public Multimap<MailboxId, UpdatedFlags> setFlags(MessageId messageId, List<MailboxId> mailboxIds, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException {
return Flux.fromIterable(mailboxIds)
.distinct()
.map(mailboxId -> (CassandraId) mailboxId)
.filterWhen(mailboxId -> haveMetaData(messageId, mailboxId))
.concatMap(mailboxId -> flagsUpdateWithRetry(newState, updateMode, mailboxId, messageId))
.flatMap(this::updateCounts)
- .collect(Guavate.toImmutableMap(Pair::getLeft, Pair::getRight, KEEP_FIRST))
+ .collect(Guavate.toImmutableListMultimap(Pair::getLeft, Pair::getRight))
.block();
}
@@ -223,15 +220,16 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
.hasElements();
}
- private Mono<Pair<MailboxId, UpdatedFlags>> flagsUpdateWithRetry(Flags newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, MessageId messageId) {
+ private Flux<Pair<MailboxId, UpdatedFlags>> flagsUpdateWithRetry(Flags newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, MessageId messageId) {
return Mono.defer(() -> tryFlagsUpdate(newState, updateMode, mailboxId, messageId))
.single()
.retry(cassandraConfiguration.getFlagsUpdateMessageIdMaxRetry())
- .map(pair -> buildUpdatedFlags(pair.getRight(), pair.getLeft()))
.onErrorResume(MailboxDeleteDuringUpdateException.class, e -> {
LOGGER.info("Mailbox {} was deleted during flag update", mailboxId);
return Mono.empty();
- });
+ })
+ .flatMapMany(Flux::fromIterable)
+ .map(pair -> buildUpdatedFlags(pair.getRight(), pair.getLeft()));
}
private Pair<MailboxId, UpdatedFlags> buildUpdatedFlags(ComposedMessageIdWithMetaData composedMessageIdWithMetaData, Flags oldFlags) {
@@ -250,7 +248,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
.thenReturn(pair);
}
- private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> tryFlagsUpdate(Flags newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, MessageId messageId) {
+ private Mono<List<Pair<Flags, ComposedMessageIdWithMetaData>>> tryFlagsUpdate(Flags newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, MessageId messageId) {
try {
return updateFlags(mailboxId, messageId, newState, updateMode);
} catch (MailboxException e) {
@@ -259,12 +257,12 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
}
}
- private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(MailboxId mailboxId, MessageId messageId, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException {
+ private Mono<List<Pair<Flags, ComposedMessageIdWithMetaData>>> updateFlags(MailboxId mailboxId, MessageId messageId, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException {
CassandraId cassandraId = (CassandraId) mailboxId;
return imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.of(cassandraId))
.flatMap(oldComposedId -> updateFlags(newState, updateMode, cassandraId, oldComposedId))
- .next()
- .switchIfEmpty(Mono.error(MailboxDeleteDuringUpdateException::new));
+ .switchIfEmpty(Mono.error(MailboxDeleteDuringUpdateException::new))
+ .collectList();
}
private Mono<Pair<Flags, ComposedMessageIdWithMetaData>> updateFlags(Flags newState, MessageManager.FlagsUpdateMode updateMode, CassandraId cassandraId, ComposedMessageIdWithMetaData oldComposedId) {
diff --git a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java
index a9dbaf5..bc34599 100644
--- a/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java
+++ b/mailbox/memory/src/main/java/org/apache/james/mailbox/inmemory/mail/InMemoryMessageIdMapper.java
@@ -23,10 +23,7 @@ import static org.apache.james.mailbox.store.mail.AbstractMessageMapper.UNLIMITE
import java.util.Collection;
import java.util.List;
-import java.util.Map;
-import java.util.function.BinaryOperator;
import java.util.function.Function;
-import java.util.stream.Collectors;
import javax.mail.Flags;
@@ -47,11 +44,9 @@ import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import com.github.fge.lambdas.Throwing;
import com.github.steveash.guavate.Guavate;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Multimap;
public class InMemoryMessageIdMapper implements MessageIdMapper {
- private static final BinaryOperator<UpdatedFlags> KEEP_FIRST = (p, q) -> p;
-
private final MailboxMapper mailboxMapper;
private final InMemoryMessageMapper messageMapper;
@@ -120,17 +115,16 @@ public class InMemoryMessageIdMapper implements MessageIdMapper {
}
@Override
- public Map<MailboxId, UpdatedFlags> setFlags(MessageId messageId, List<MailboxId> mailboxIds,
- Flags newState, FlagsUpdateMode updateMode) throws MailboxException {
+ public Multimap<MailboxId, UpdatedFlags> setFlags(MessageId messageId, List<MailboxId> mailboxIds,
+ Flags newState, FlagsUpdateMode updateMode) {
return find(ImmutableList.of(messageId), MessageMapper.FetchType.Metadata)
.stream()
.filter(message -> mailboxIds.contains(message.getMailboxId()))
.map(updateMessage(newState, updateMode))
.distinct()
- .collect(Guavate.toImmutableMap(
+ .collect(Guavate.toImmutableListMultimap(
Pair::getKey,
- Pair::getValue,
- KEEP_FIRST));
+ Pair::getValue));
}
private Function<MailboxMessage, Pair<MailboxId, UpdatedFlags>> updateMessage(Flags newState, FlagsUpdateMode updateMode) {
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
index 7f10620..7e37bf8 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageIdManager.java
@@ -74,6 +74,7 @@ import com.github.fge.lambdas.functions.ThrowingFunction;
import com.github.steveash.guavate.Guavate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import reactor.core.publisher.Flux;
@@ -117,9 +118,9 @@ public class StoreMessageIdManager implements MessageIdManager {
assertRightsOnMailboxes(mailboxIds, mailboxSession, Right.Write);
- Map<MailboxId, UpdatedFlags> updatedFlags = messageIdMapper.setFlags(messageId, mailboxIds, newState, replace);
- for (Map.Entry<MailboxId, UpdatedFlags> entry : updatedFlags.entrySet()) {
- dispatchFlagsChange(mailboxSession, entry.getKey(), entry.getValue());
+ Multimap<MailboxId, UpdatedFlags> updatedFlags = messageIdMapper.setFlags(messageId, mailboxIds, newState, replace);
+ for (Map.Entry<MailboxId, Collection<UpdatedFlags>> entry : updatedFlags.asMap().entrySet()) {
+ dispatchFlagsChange(mailboxSession, entry.getKey(), ImmutableList.copyOf(entry.getValue()));
}
}
@@ -322,15 +323,15 @@ public class StoreMessageIdManager implements MessageIdManager {
}
}
- private void dispatchFlagsChange(MailboxSession mailboxSession, MailboxId mailboxId, UpdatedFlags updatedFlags) throws MailboxException {
- if (updatedFlags.flagsChanged()) {
+ private void dispatchFlagsChange(MailboxSession mailboxSession, MailboxId mailboxId, ImmutableList<UpdatedFlags> updatedFlags) throws MailboxException {
+ if (updatedFlags.stream().anyMatch(UpdatedFlags::flagsChanged)) {
Mailbox mailbox = mailboxSessionMapperFactory.getMailboxMapper(mailboxSession).findMailboxById(mailboxId);
eventBus.dispatch(EventFactory.flagsUpdated()
.randomEventId()
.mailboxSession(mailboxSession)
.mailbox(mailbox)
- .updatedFlag(updatedFlags)
+ .updatedFlags(updatedFlags)
.build(),
new MailboxIdRegistrationKey(mailboxId))
.block();
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java
index 2f9457f..094ce72 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageIdMapper.java
@@ -20,7 +20,6 @@ package org.apache.james.mailbox.store.mail;
import java.util.Collection;
import java.util.List;
-import java.util.Map;
import javax.mail.Flags;
@@ -54,5 +53,14 @@ public interface MessageIdMapper {
.forEach(this::delete);
}
- Map<MailboxId, UpdatedFlags> setFlags(MessageId messageId, List<MailboxId> mailboxIds, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException;
+ /**
+ * Updates the flags of the messages with the given MessageId in the supplied mailboxes
+ *
+ * More one message can be updated when a message is contained several time in the same mailbox with distinct
+ * MessageUid.
+ *
+ * @return Metadata of the update, indexed by mailboxIds.
+ * @throws MailboxException
+ */
+ Multimap<MailboxId, UpdatedFlags> setFlags(MessageId messageId, List<MailboxId> mailboxIds, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException;
}
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageIdMapperTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageIdMapperTest.java
index 3de1f43..0c9fc61 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageIdMapperTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageIdMapperTest.java
@@ -60,6 +60,7 @@ import org.junit.jupiter.api.Test;
import com.github.steveash.guavate.Guavate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMultimap;
+import com.google.common.collect.Multimap;
public abstract class MessageIdMapperTest {
private static final Username BENWA = Username.of("benwa");
@@ -367,7 +368,7 @@ public abstract class MessageIdMapperTest {
MessageId messageId = message1.getMessageId();
Flags newFlags = new Flags(Flag.ANSWERED);
- Map<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), newFlags, FlagsUpdateMode.ADD);
+ Multimap<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), newFlags, FlagsUpdateMode.ADD);
ModSeq modSeq = mapperProvider.highestModSeq(benwaInboxMailbox);
UpdatedFlags expectedUpdatedFlags = UpdatedFlags.builder()
@@ -376,7 +377,7 @@ public abstract class MessageIdMapperTest {
.oldFlags(new Flags())
.newFlags(newFlags)
.build();
- assertThat(flags).containsOnly(MapEntry.entry(benwaInboxMailbox.getMailboxId(), expectedUpdatedFlags));
+ assertThat(flags.asMap()).containsOnly(MapEntry.entry(benwaInboxMailbox.getMailboxId(), ImmutableList.of(expectedUpdatedFlags)));
}
@Test
@@ -394,7 +395,7 @@ public abstract class MessageIdMapperTest {
.add("userflag")
.build();
- Map<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), newFlags, FlagsUpdateMode.REPLACE);
+ Multimap<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), newFlags, FlagsUpdateMode.REPLACE);
ModSeq modSeq = mapperProvider.highestModSeq(benwaInboxMailbox);
UpdatedFlags expectedUpdatedFlags = UpdatedFlags.builder()
@@ -404,7 +405,8 @@ public abstract class MessageIdMapperTest {
.newFlags(newFlags)
.build();
- assertThat(flags).contains(MapEntry.entry(benwaInboxMailbox.getMailboxId(), expectedUpdatedFlags));
+ assertThat(flags.asMap())
+ .contains(MapEntry.entry(benwaInboxMailbox.getMailboxId(), ImmutableList.of(expectedUpdatedFlags)));
}
@Test
@@ -422,7 +424,7 @@ public abstract class MessageIdMapperTest {
.add("userflag")
.build();
- Map<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), newFlags, FlagsUpdateMode.REMOVE);
+ Multimap<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), newFlags, FlagsUpdateMode.REMOVE);
ModSeq modSeq = mapperProvider.highestModSeq(benwaInboxMailbox);
UpdatedFlags expectedUpdatedFlags = UpdatedFlags.builder()
@@ -432,7 +434,8 @@ public abstract class MessageIdMapperTest {
.newFlags(new Flags(Flags.Flag.RECENT))
.build();
- assertThat(flags).contains(MapEntry.entry(benwaInboxMailbox.getMailboxId(), expectedUpdatedFlags));
+ assertThat(flags.asMap())
+ .contains(MapEntry.entry(benwaInboxMailbox.getMailboxId(), ImmutableList.of(expectedUpdatedFlags)));
}
@Test
@@ -466,17 +469,17 @@ public abstract class MessageIdMapperTest {
MessageId messageId = message1.getMessageId();
Flags newFlags = new Flags(Flag.ANSWERED);
- Map<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(), newFlags, FlagsUpdateMode.REMOVE);
+ Multimap<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(), newFlags, FlagsUpdateMode.REMOVE);
- assertThat(flags).isEmpty();
+ assertThat(flags.asMap()).isEmpty();
}
@Test
void setFlagsShouldReturnEmptyWhenMessageIdDoesntExist() throws Exception {
MessageId unknownMessageId = mapperProvider.generateMessageId();
- Map<MailboxId, UpdatedFlags> flags = sut.setFlags(unknownMessageId, ImmutableList.of(message1.getMailboxId()), new Flags(Flag.RECENT), FlagsUpdateMode.REMOVE);
+ Multimap<MailboxId, UpdatedFlags> flags = sut.setFlags(unknownMessageId, ImmutableList.of(message1.getMailboxId()), new Flags(Flag.RECENT), FlagsUpdateMode.REMOVE);
- assertThat(flags).isEmpty();
+ assertThat(flags.asMap()).isEmpty();
}
@Test
@@ -489,7 +492,7 @@ public abstract class MessageIdMapperTest {
MessageId messageId = message1.getMessageId();
- Map<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), new Flags(Flag.ANSWERED), FlagsUpdateMode.ADD);
+ Multimap<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId()), new Flags(Flag.ANSWERED), FlagsUpdateMode.ADD);
Flags newFlags = new FlagsBuilder()
.add(Flag.RECENT)
@@ -502,7 +505,8 @@ public abstract class MessageIdMapperTest {
.oldFlags(initialFlags)
.newFlags(newFlags)
.build();
- assertThat(flags).containsOnly(MapEntry.entry(benwaInboxMailbox.getMailboxId(), expectedUpdatedFlags));
+ assertThat(flags.asMap())
+ .containsOnly(MapEntry.entry(benwaInboxMailbox.getMailboxId(), ImmutableList.of(expectedUpdatedFlags)));
}
@Test
@@ -518,7 +522,7 @@ public abstract class MessageIdMapperTest {
MessageId messageId = message1.getMessageId();
Flags newFlags = new Flags(Flag.ANSWERED);
- Map<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId(), message1InOtherMailbox.getMailboxId()), newFlags, FlagsUpdateMode.ADD);
+ Multimap<MailboxId, UpdatedFlags> flags = sut.setFlags(messageId, ImmutableList.of(message1.getMailboxId(), message1InOtherMailbox.getMailboxId()), newFlags, FlagsUpdateMode.ADD);
ModSeq modSeqBenwaInboxMailbox = mapperProvider.highestModSeq(benwaInboxMailbox);
ModSeq modSeqBenwaWorkMailbox = mapperProvider.highestModSeq(benwaWorkMailbox);
@@ -534,8 +538,9 @@ public abstract class MessageIdMapperTest {
.oldFlags(new Flags())
.newFlags(newFlags)
.build();
- assertThat(flags).containsOnly(MapEntry.entry(benwaInboxMailbox.getMailboxId(), expectedUpdatedFlags),
- MapEntry.entry(message1InOtherMailbox.getMailboxId(), expectedUpdatedFlags2));
+ assertThat(flags.asMap())
+ .containsOnly(MapEntry.entry(benwaInboxMailbox.getMailboxId(), ImmutableList.of(expectedUpdatedFlags)),
+ MapEntry.entry(message1InOtherMailbox.getMailboxId(), ImmutableList.of(expectedUpdatedFlags2)));
}
@Test
@@ -857,19 +862,19 @@ public abstract class MessageIdMapperTest {
message1.setFlags(flags);
sut.save(message1);
- Map<MailboxId, UpdatedFlags> mailboxIdUpdatedFlagsMap = sut.setFlags(message1.getMessageId(),
+ Multimap<MailboxId, UpdatedFlags> mailboxIdUpdatedFlagsMap = sut.setFlags(message1.getMessageId(),
ImmutableList.of(message1.getMailboxId()),
flags,
FlagsUpdateMode.ADD);
- assertThat(mailboxIdUpdatedFlagsMap)
+ assertThat(mailboxIdUpdatedFlagsMap.asMap())
.containsOnly(MapEntry.entry(message1.getMailboxId(),
- UpdatedFlags.builder()
+ ImmutableList.of(UpdatedFlags.builder()
.modSeq(modSeq)
.uid(message1.getUid())
.newFlags(flags)
.oldFlags(flags)
- .build()));
+ .build())));
}
@Test
@@ -943,6 +948,17 @@ public abstract class MessageIdMapperTest {
}
@Test
+ void setFlagsShouldReturnAllUp() throws Exception {
+ addMessageAndSetModSeq(benwaInboxMailbox, message1);
+ addMessageAndSetModSeq(benwaInboxMailbox, message1);
+
+ Multimap<MailboxId, UpdatedFlags> map = sut.setFlags(message1.getMessageId(), ImmutableList.of(message1.getMailboxId()), new Flags(Flag.ANSWERED), FlagsUpdateMode.ADD);
+
+ assertThat(map.values()).hasSize(2);
+ assertThat(map.asMap()).hasSize(1);
+ }
+
+ @Test
void deletesShouldUpdateUnreadCount() throws Exception {
message1.setUid(mapperProvider.generateMessageUid());
message1.setModSeq(mapperProvider.generateModSeq(benwaInboxMailbox));
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org