You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/06/25 02:56:15 UTC
[james-project] 09/10: JAMES-3265 Impement a MessageMapper method
to reset all recents
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 4003c37e9ffb87902124f8109ed4c1acd12e575c
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Jun 23 14:11:21 2020 +0700
JAMES-3265 Impement a MessageMapper method to reset all recents
This enables a single modseq allocation for the entire operaton,
fasting significantly things up.
Cassandra query analysis showed that a large portion of time was
spent upon some IMAP SELECT operations updating modseqs.
---
.../cassandra/mail/CassandraMessageMapper.java | 21 ++++++++++-
.../james/mailbox/store/StoreMessageManager.java | 43 +++++++++++++---------
.../james/mailbox/store/mail/MessageMapper.java | 19 ++++++++++
.../store/mail/model/MessageMapperTest.java | 33 ++++++++++++++++-
4 files changed, 97 insertions(+), 19 deletions(-)
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 39a7757..82c9e6b 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -35,6 +35,7 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
import org.apache.james.mailbox.ApplicableFlagBuilder;
import org.apache.james.mailbox.FlagsBuilder;
+import org.apache.james.mailbox.MessageManager.FlagsUpdateMode;
import org.apache.james.mailbox.MessageUid;
import org.apache.james.mailbox.ModSeq;
import org.apache.james.mailbox.cassandra.ids.CassandraId;
@@ -294,12 +295,30 @@ public class CassandraMessageMapper implements MessageMapper {
Flux<ComposedMessageIdWithMetaData> toBeUpdated = messageIdDAO.retrieveMessages(mailboxId, range, Limit.unlimited());
+ return updateFlags(flagUpdateCalculator, mailboxId, toBeUpdated).iterator();
+ }
+
+ private List<UpdatedFlags> updateFlags(FlagsUpdateCalculator flagUpdateCalculator, CassandraId mailboxId, Flux<ComposedMessageIdWithMetaData> toBeUpdated) {
FlagsUpdateStageResult firstResult = runUpdateStage(mailboxId, toBeUpdated, flagUpdateCalculator).block();
FlagsUpdateStageResult finalResult = handleUpdatesStagedRetry(mailboxId, flagUpdateCalculator, firstResult);
if (finalResult.containsFailedResults()) {
LOGGER.error("Can not update following UIDs {} for mailbox {}", finalResult.getFailed(), mailboxId.asUuid());
}
- return finalResult.getSucceeded().iterator();
+ return finalResult.getSucceeded();
+ }
+
+ @Override
+ public List<UpdatedFlags> resetRecent(Mailbox mailbox) {
+ CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
+
+ Flux<ComposedMessageIdWithMetaData> toBeUpdated = mailboxRecentDAO.getRecentMessageUidsInMailbox(mailboxId)
+ .collectList()
+ .flatMapMany(uids -> Flux.fromIterable(MessageRange.toRanges(uids)))
+ .concatMap(range -> messageIdDAO.retrieveMessages(mailboxId, range, Limit.unlimited()))
+ .filter(message -> message.getFlags().contains(Flag.RECENT));
+ FlagsUpdateCalculator calculator = new FlagsUpdateCalculator(new Flags(Flag.RECENT), FlagsUpdateMode.REMOVE);
+
+ return updateFlags(calculator, mailboxId, toBeUpdated);
}
private FlagsUpdateStageResult handleUpdatesStagedRetry(CassandraId mailboxId, FlagsUpdateCalculator flagUpdateCalculator, FlagsUpdateStageResult firstResult) {
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
index d2f6520..a90e81b 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
@@ -656,30 +656,39 @@ public class StoreMessageManager implements MessageManager {
* the recent flag on the messages for the uids
*/
protected List<MessageUid> recent(final boolean reset, MailboxSession mailboxSession) throws MailboxException {
- if (reset) {
- if (!isWriteable(mailboxSession)) {
- throw new ReadOnlyException(getMailboxPath());
- }
- }
- final MessageMapper messageMapper = mapperFactory.getMessageMapper(mailboxSession);
+ MessageMapper messageMapper = mapperFactory.getMessageMapper(mailboxSession);
return messageMapper.execute(() -> {
- final List<MessageUid> members = messageMapper.findRecentMessageUidsInMailbox(getMailboxEntity());
-
- // Convert to MessageRanges so we may be able to optimize the
- // flag update
- List<MessageRange> ranges = MessageRange.toRanges(members);
- for (MessageRange range : ranges) {
- if (reset) {
- // only call save if we need to
- messageMapper.updateFlags(getMailboxEntity(), new FlagsUpdateCalculator(new Flags(Flag.RECENT), FlagsUpdateMode.REMOVE), range);
- }
+ if (reset) {
+ return resetRecents(messageMapper, mailboxSession);
}
- return members;
+ return messageMapper.findRecentMessageUidsInMailbox(getMailboxEntity());
});
}
+ private List<MessageUid> resetRecents(MessageMapper messageMapper, MailboxSession mailboxSession) throws MailboxException {
+ if (!isWriteable(mailboxSession)) {
+ throw new ReadOnlyException(getMailboxPath());
+ }
+
+ List<UpdatedFlags> updatedFlags = messageMapper.resetRecent(getMailboxEntity());
+
+ eventBus.dispatch(EventFactory.flagsUpdated()
+ .randomEventId()
+ .mailboxSession(mailboxSession)
+ .mailbox(getMailboxEntity())
+ .updatedFlags(updatedFlags)
+ .build(),
+ new MailboxIdRegistrationKey(mailbox.getMailboxId()))
+ .subscribeOn(Schedulers.elastic())
+ .block();
+
+ return updatedFlags.stream()
+ .map(UpdatedFlags::getUid)
+ .collect(Guavate.toImmutableList());
+ }
+
private void runPredeletionHooks(List<MessageUid> uids, MailboxSession session) {
MessageMapper messageMapper = mapperFactory.getMessageMapper(session);
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageMapper.java
index 2c79390..ece0c7d 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageMapper.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageMapper.java
@@ -18,6 +18,8 @@
****************************************************************/
package org.apache.james.mailbox.store.mail;
+import static javax.mail.Flags.Flag.RECENT;
+
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -25,6 +27,7 @@ import java.util.Optional;
import javax.mail.Flags;
+import org.apache.james.mailbox.MessageManager.FlagsUpdateMode;
import org.apache.james.mailbox.MessageUid;
import org.apache.james.mailbox.ModSeq;
import org.apache.james.mailbox.exception.MailboxException;
@@ -39,6 +42,8 @@ import org.apache.james.mailbox.store.mail.model.Property;
import org.apache.james.mailbox.store.transaction.Mapper;
import org.apache.james.util.streams.Iterators;
+import com.google.common.collect.ImmutableList;
+
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -121,6 +126,20 @@ public interface MessageMapper extends Mapper {
*/
Iterator<UpdatedFlags> updateFlags(Mailbox mailbox, FlagsUpdateCalculator flagsUpdateCalculator,
final MessageRange set) throws MailboxException;
+
+ default List<UpdatedFlags> resetRecent(Mailbox mailbox) throws MailboxException {
+ final List<MessageUid> members = findRecentMessageUidsInMailbox(mailbox);
+ ImmutableList.Builder<UpdatedFlags> result = ImmutableList.builder();
+
+ FlagsUpdateCalculator calculator = new FlagsUpdateCalculator(new Flags(RECENT), FlagsUpdateMode.REMOVE);
+ // Convert to MessageRanges so we may be able to optimize the flag update
+ List<MessageRange> ranges = MessageRange.toRanges(members);
+ for (MessageRange range : ranges) {
+ result.addAll(updateFlags(mailbox, calculator, range));
+ }
+ return result.build();
+ }
+
/**
* Copy the given {@link MailboxMessage} to a new mailbox and return the uid of the copy. Be aware that the given uid is just a suggestion for the uid of the copied
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java
index 7f0d1ed..1d44644 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java
@@ -319,7 +319,38 @@ public abstract class MessageMapperTest {
messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.RECENT), FlagsUpdateMode.REPLACE), MessageRange.one(message2.getUid()));
messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.RECENT), FlagsUpdateMode.REPLACE), MessageRange.one(message4.getUid()));
messageMapper.updateFlags(benwaWorkMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.RECENT), FlagsUpdateMode.REPLACE), MessageRange.one(message6.getUid()));
- assertThat(messageMapper.findRecentMessageUidsInMailbox(benwaInboxMailbox)).containsOnly(message2.getUid(), message4.getUid());
+ assertThat(messageMapper.findRecentMessageUidsInMailbox(benwaInboxMailbox))
+ .containsOnly(message2.getUid(), message4.getUid());
+ }
+
+ @Test
+ void resetRecentsShouldReturnEmptyListWhenNoMessagesMarkedAsRecentArePresentInMailbox() throws MailboxException {
+ assertThat(messageMapper.resetRecent(benwaInboxMailbox)).isEmpty();
+ }
+
+ @Test
+ void resetRecentsShouldRemoveAllRecentFlags() throws MailboxException {
+ saveMessages();
+ messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.RECENT), FlagsUpdateMode.REPLACE), MessageRange.one(message2.getUid()));
+ messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.RECENT), FlagsUpdateMode.REPLACE), MessageRange.one(message4.getUid()));
+ messageMapper.updateFlags(benwaWorkMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.RECENT), FlagsUpdateMode.REPLACE), MessageRange.one(message6.getUid()));
+
+ messageMapper.resetRecent(benwaInboxMailbox);
+
+ assertThat(messageMapper.findRecentMessageUidsInMailbox(benwaInboxMailbox))
+ .isEmpty();
+ }
+
+ @Test
+ void resetRecentsShouldReturnUpdatedFlags() throws MailboxException {
+ saveMessages();
+ messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.RECENT), FlagsUpdateMode.REPLACE), MessageRange.one(message2.getUid()));
+ messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.RECENT), FlagsUpdateMode.REPLACE), MessageRange.one(message4.getUid()));
+ messageMapper.updateFlags(benwaWorkMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.RECENT), FlagsUpdateMode.REPLACE), MessageRange.one(message6.getUid()));
+
+ assertThat(messageMapper.resetRecent(benwaInboxMailbox))
+ .extracting(UpdatedFlags::getUid)
+ .containsOnly(message2.getUid(), message4.getUid());
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org