You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/06/25 02:56:15 UTC

[james-project] 09/10: JAMES-3265 Impement a MessageMapper method to reset all recents

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 4003c37e9ffb87902124f8109ed4c1acd12e575c
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Jun 23 14:11:21 2020 +0700

    JAMES-3265 Impement a MessageMapper method to reset all recents
    
    This enables a single modseq allocation for the entire operaton,
    fasting significantly things up.
    
    Cassandra query analysis showed that a large portion of time was
    spent upon some IMAP SELECT operations updating modseqs.
---
 .../cassandra/mail/CassandraMessageMapper.java     | 21 ++++++++++-
 .../james/mailbox/store/StoreMessageManager.java   | 43 +++++++++++++---------
 .../james/mailbox/store/mail/MessageMapper.java    | 19 ++++++++++
 .../store/mail/model/MessageMapperTest.java        | 33 ++++++++++++++++-
 4 files changed, 97 insertions(+), 19 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 39a7757..82c9e6b 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -35,6 +35,7 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
 import org.apache.james.mailbox.ApplicableFlagBuilder;
 import org.apache.james.mailbox.FlagsBuilder;
+import org.apache.james.mailbox.MessageManager.FlagsUpdateMode;
 import org.apache.james.mailbox.MessageUid;
 import org.apache.james.mailbox.ModSeq;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
@@ -294,12 +295,30 @@ public class CassandraMessageMapper implements MessageMapper {
 
         Flux<ComposedMessageIdWithMetaData> toBeUpdated = messageIdDAO.retrieveMessages(mailboxId, range, Limit.unlimited());
 
+        return updateFlags(flagUpdateCalculator, mailboxId, toBeUpdated).iterator();
+    }
+
+    private List<UpdatedFlags> updateFlags(FlagsUpdateCalculator flagUpdateCalculator, CassandraId mailboxId, Flux<ComposedMessageIdWithMetaData> toBeUpdated) {
         FlagsUpdateStageResult firstResult = runUpdateStage(mailboxId, toBeUpdated, flagUpdateCalculator).block();
         FlagsUpdateStageResult finalResult = handleUpdatesStagedRetry(mailboxId, flagUpdateCalculator, firstResult);
         if (finalResult.containsFailedResults()) {
             LOGGER.error("Can not update following UIDs {} for mailbox {}", finalResult.getFailed(), mailboxId.asUuid());
         }
-        return finalResult.getSucceeded().iterator();
+        return finalResult.getSucceeded();
+    }
+
+    @Override
+    public List<UpdatedFlags> resetRecent(Mailbox mailbox) {
+        CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
+
+        Flux<ComposedMessageIdWithMetaData> toBeUpdated = mailboxRecentDAO.getRecentMessageUidsInMailbox(mailboxId)
+            .collectList()
+            .flatMapMany(uids -> Flux.fromIterable(MessageRange.toRanges(uids)))
+            .concatMap(range -> messageIdDAO.retrieveMessages(mailboxId, range, Limit.unlimited()))
+            .filter(message -> message.getFlags().contains(Flag.RECENT));
+        FlagsUpdateCalculator calculator = new FlagsUpdateCalculator(new Flags(Flag.RECENT), FlagsUpdateMode.REMOVE);
+
+        return updateFlags(calculator, mailboxId, toBeUpdated);
     }
 
     private FlagsUpdateStageResult handleUpdatesStagedRetry(CassandraId mailboxId, FlagsUpdateCalculator flagUpdateCalculator, FlagsUpdateStageResult firstResult) {
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
index d2f6520..a90e81b 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/StoreMessageManager.java
@@ -656,30 +656,39 @@ public class StoreMessageManager implements MessageManager {
      * the recent flag on the messages for the uids
      */
     protected List<MessageUid> recent(final boolean reset, MailboxSession mailboxSession) throws MailboxException {
-        if (reset) {
-            if (!isWriteable(mailboxSession)) {
-                throw new ReadOnlyException(getMailboxPath());
-            }
-        }
-        final MessageMapper messageMapper = mapperFactory.getMessageMapper(mailboxSession);
+        MessageMapper messageMapper = mapperFactory.getMessageMapper(mailboxSession);
 
         return messageMapper.execute(() -> {
-            final List<MessageUid> members = messageMapper.findRecentMessageUidsInMailbox(getMailboxEntity());
-
-            // Convert to MessageRanges so we may be able to optimize the
-            // flag update
-            List<MessageRange> ranges = MessageRange.toRanges(members);
-            for (MessageRange range : ranges) {
-                if (reset) {
-                    // only call save if we need to
-                    messageMapper.updateFlags(getMailboxEntity(), new FlagsUpdateCalculator(new Flags(Flag.RECENT), FlagsUpdateMode.REMOVE), range);
-                }
+            if (reset) {
+                return resetRecents(messageMapper, mailboxSession);
             }
-            return members;
+            return messageMapper.findRecentMessageUidsInMailbox(getMailboxEntity());
         });
 
     }
 
+    private List<MessageUid> resetRecents(MessageMapper messageMapper, MailboxSession mailboxSession) throws MailboxException {
+        if (!isWriteable(mailboxSession)) {
+            throw new ReadOnlyException(getMailboxPath());
+        }
+
+        List<UpdatedFlags> updatedFlags = messageMapper.resetRecent(getMailboxEntity());
+
+        eventBus.dispatch(EventFactory.flagsUpdated()
+                .randomEventId()
+                .mailboxSession(mailboxSession)
+                .mailbox(getMailboxEntity())
+                .updatedFlags(updatedFlags)
+                .build(),
+            new MailboxIdRegistrationKey(mailbox.getMailboxId()))
+            .subscribeOn(Schedulers.elastic())
+            .block();
+
+        return updatedFlags.stream()
+            .map(UpdatedFlags::getUid)
+            .collect(Guavate.toImmutableList());
+    }
+
     private void runPredeletionHooks(List<MessageUid> uids, MailboxSession session) {
         MessageMapper messageMapper = mapperFactory.getMessageMapper(session);
 
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageMapper.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageMapper.java
index 2c79390..ece0c7d 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageMapper.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/mail/MessageMapper.java
@@ -18,6 +18,8 @@
  ****************************************************************/
 package org.apache.james.mailbox.store.mail;
 
+import static javax.mail.Flags.Flag.RECENT;
+
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -25,6 +27,7 @@ import java.util.Optional;
 
 import javax.mail.Flags;
 
+import org.apache.james.mailbox.MessageManager.FlagsUpdateMode;
 import org.apache.james.mailbox.MessageUid;
 import org.apache.james.mailbox.ModSeq;
 import org.apache.james.mailbox.exception.MailboxException;
@@ -39,6 +42,8 @@ import org.apache.james.mailbox.store.mail.model.Property;
 import org.apache.james.mailbox.store.transaction.Mapper;
 import org.apache.james.util.streams.Iterators;
 
+import com.google.common.collect.ImmutableList;
+
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
@@ -121,6 +126,20 @@ public interface MessageMapper extends Mapper {
      */
     Iterator<UpdatedFlags> updateFlags(Mailbox mailbox, FlagsUpdateCalculator flagsUpdateCalculator,
             final MessageRange set) throws MailboxException;
+
+    default List<UpdatedFlags> resetRecent(Mailbox mailbox) throws MailboxException {
+        final List<MessageUid> members = findRecentMessageUidsInMailbox(mailbox);
+        ImmutableList.Builder<UpdatedFlags> result = ImmutableList.builder();
+
+        FlagsUpdateCalculator calculator = new FlagsUpdateCalculator(new Flags(RECENT), FlagsUpdateMode.REMOVE);
+        // Convert to MessageRanges so we may be able to optimize the flag update
+        List<MessageRange> ranges = MessageRange.toRanges(members);
+        for (MessageRange range : ranges) {
+            result.addAll(updateFlags(mailbox, calculator, range));
+        }
+        return result.build();
+    }
+
     
     /**
      * Copy the given {@link MailboxMessage} to a new mailbox and return the uid of the copy. Be aware that the given uid is just a suggestion for the uid of the copied
diff --git a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java
index 7f0d1ed..1d44644 100644
--- a/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java
+++ b/mailbox/store/src/test/java/org/apache/james/mailbox/store/mail/model/MessageMapperTest.java
@@ -319,7 +319,38 @@ public abstract class MessageMapperTest {
         messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.RECENT), FlagsUpdateMode.REPLACE), MessageRange.one(message2.getUid()));
         messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.RECENT), FlagsUpdateMode.REPLACE), MessageRange.one(message4.getUid()));
         messageMapper.updateFlags(benwaWorkMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.RECENT), FlagsUpdateMode.REPLACE), MessageRange.one(message6.getUid()));
-        assertThat(messageMapper.findRecentMessageUidsInMailbox(benwaInboxMailbox)).containsOnly(message2.getUid(), message4.getUid());
+        assertThat(messageMapper.findRecentMessageUidsInMailbox(benwaInboxMailbox))
+            .containsOnly(message2.getUid(), message4.getUid());
+    }
+
+    @Test
+    void resetRecentsShouldReturnEmptyListWhenNoMessagesMarkedAsRecentArePresentInMailbox() throws MailboxException {
+        assertThat(messageMapper.resetRecent(benwaInboxMailbox)).isEmpty();
+    }
+
+    @Test
+    void resetRecentsShouldRemoveAllRecentFlags() throws MailboxException {
+        saveMessages();
+        messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.RECENT), FlagsUpdateMode.REPLACE), MessageRange.one(message2.getUid()));
+        messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.RECENT), FlagsUpdateMode.REPLACE), MessageRange.one(message4.getUid()));
+        messageMapper.updateFlags(benwaWorkMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.RECENT), FlagsUpdateMode.REPLACE), MessageRange.one(message6.getUid()));
+
+        messageMapper.resetRecent(benwaInboxMailbox);
+
+        assertThat(messageMapper.findRecentMessageUidsInMailbox(benwaInboxMailbox))
+            .isEmpty();
+    }
+
+    @Test
+    void resetRecentsShouldReturnUpdatedFlags() throws MailboxException {
+        saveMessages();
+        messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.RECENT), FlagsUpdateMode.REPLACE), MessageRange.one(message2.getUid()));
+        messageMapper.updateFlags(benwaInboxMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.RECENT), FlagsUpdateMode.REPLACE), MessageRange.one(message4.getUid()));
+        messageMapper.updateFlags(benwaWorkMailbox, new FlagsUpdateCalculator(new Flags(Flags.Flag.RECENT), FlagsUpdateMode.REPLACE), MessageRange.one(message6.getUid()));
+
+        assertThat(messageMapper.resetRecent(benwaInboxMailbox))
+            .extracting(UpdatedFlags::getUid)
+            .containsOnly(message2.getUid(), message4.getUid());
     }
 
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org