You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2020/10/14 02:31:30 UTC

[james-project] 04/22: JAMES-3277 Avoid a double message read upon CassandraMessageIdMapper::setFlags

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 8b8183153070fedf984954d12f14a30aea16e491
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Oct 12 12:20:27 2020 +0700

    JAMES-3277 Avoid a double message read upon CassandraMessageIdMapper::setFlags
    
    Not found was handled via an extra read but we could rely with a smarter error management
    on the reads of updateFlags to detect them.
---
 .../cassandra/mail/CassandraMessageIdMapper.java   | 25 +++++----------------
 .../mail/CassandraMessageIdMapperTest.java         | 26 ++++++++++++++++++++++
 2 files changed, 31 insertions(+), 20 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index b0293d1..771c276 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -101,7 +101,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
 
     @Override
     public Flux<MailboxMessage> findReactive(Collection<MessageId> messageIds, FetchType fetchType) {
-        return Flux.fromStream(messageIds.stream())
+        return Flux.fromIterable(messageIds)
             .flatMap(messageId -> imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty()), cassandraConfiguration.getMessageReadChunkSize())
             .flatMap(composedMessageId -> messageDAO.retrieveMessage(composedMessageId, fetchType)
                 .map(messageRepresentation -> Pair.of(composedMessageId, messageRepresentation)), cassandraConfiguration.getMessageReadChunkSize())
@@ -220,21 +220,14 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
         return Flux.fromIterable(mailboxIds)
             .distinct()
             .map(mailboxId -> (CassandraId) mailboxId)
-            .filterWhen(mailboxId -> haveMetaData(messageId, mailboxId))
             .concatMap(mailboxId -> flagsUpdateWithRetry(newState, updateMode, mailboxId, messageId))
             .flatMap(this::updateCounts)
             .collect(Guavate.toImmutableListMultimap(Pair::getLeft, Pair::getRight))
             .block();
     }
 
-    private Mono<Boolean> haveMetaData(MessageId messageId, CassandraId mailboxId) {
-        return imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.of(mailboxId))
-            .hasElements();
-    }
-
     private Flux<Pair<MailboxId, UpdatedFlags>> flagsUpdateWithRetry(Flags newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, MessageId messageId) {
-        return Mono.defer(() -> tryFlagsUpdate(newState, updateMode, mailboxId, messageId))
-            .single()
+        return Mono.defer(() -> updateFlags(mailboxId, messageId, newState, updateMode))
             .retry(cassandraConfiguration.getFlagsUpdateMessageIdMaxRetry())
             .onErrorResume(MailboxDeleteDuringUpdateException.class, e -> {
                 LOGGER.info("Mailbox {} was deleted during flag update", mailboxId);
@@ -261,16 +254,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
             .thenReturn(pair);
     }
 
-    private Mono<List<Pair<Flags, ComposedMessageIdWithMetaData>>> tryFlagsUpdate(Flags newState, MessageManager.FlagsUpdateMode updateMode, MailboxId mailboxId, MessageId messageId) {
-        try {
-            return updateFlags(mailboxId, messageId, newState, updateMode);
-        } catch (MailboxException e) {
-            LOGGER.error("Error while updating flags on mailbox: {}", mailboxId);
-            return Mono.empty();
-        }
-    }
-
-    private Mono<List<Pair<Flags, ComposedMessageIdWithMetaData>>> updateFlags(MailboxId mailboxId, MessageId messageId, Flags newState, MessageManager.FlagsUpdateMode updateMode) throws MailboxException {
+    private Mono<List<Pair<Flags, ComposedMessageIdWithMetaData>>> updateFlags(MailboxId mailboxId, MessageId messageId, Flags newState, MessageManager.FlagsUpdateMode updateMode) {
         CassandraId cassandraId = (CassandraId) mailboxId;
         return imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.of(cassandraId))
             .flatMap(oldComposedId -> updateFlags(newState, updateMode, cassandraId, oldComposedId))
@@ -300,6 +284,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
         return imapUidDAO.updateMetadata(newComposedId, oldComposedId.getModSeq())
             .filter(FunctionalUtils.identityPredicate())
             .flatMap(any -> messageIdDAO.updateMetadata(newComposedId)
-                .thenReturn(Pair.of(oldComposedId.getFlags(), newComposedId)));
+                .thenReturn(Pair.of(oldComposedId.getFlags(), newComposedId)))
+            .single();
     }
 }
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapperTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapperTest.java
index c196c71..fae47aa 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapperTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapperTest.java
@@ -85,6 +85,32 @@ class CassandraMessageIdMapperTest extends MessageIdMapperTest {
             .containsOnly(message1, message2, message3, message4);
     }
 
+    @Test
+    void setFlagsShouldMinimizeMessageReads(CassandraCluster cassandra) throws Exception {
+        CassandraMessageId.Factory messageIdFactory = new CassandraMessageId.Factory();
+        CassandraMailboxSessionMapperFactory mapperFactory = TestCassandraMailboxSessionMapperFactory.forTests(
+            cassandraCluster.getCassandraCluster(),
+            messageIdFactory,
+            CassandraConfiguration.builder()
+                .messageReadChunkSize(3)
+                .build());
+
+        saveMessages();
+
+        StatementRecorder statementRecorder = new StatementRecorder();
+        cassandra.getConf().recordStatements(statementRecorder);
+
+        mapperFactory.getMessageIdMapper(MAILBOX_SESSION).setFlags(message1.getMessageId(),
+            ImmutableList.of(message1.getMailboxId()),
+            new Flags(Flags.Flag.DELETED),
+            MessageManager.FlagsUpdateMode.REPLACE);
+
+        assertThat(statementRecorder.listExecutedStatements(
+            StatementRecorder.Selector.preparedStatementStartingWith("SELECT messageId,mailboxId,uid,modSeq,flagAnswered,flagDeleted," +
+                "flagDraft,flagFlagged,flagRecent,flagSeen,flagUser,userFlags FROM imapUidTable")))
+            .hasSize(1);
+    }
+
     @Nested
     class FailureTest {
         @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org