You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ro...@apache.org on 2020/07/07 16:02:42 UTC

[james-project] 04/05: JAMES-3265 Flags updates should not infinite loop on denormalization issues

This is an automated email from the ASF dual-hosted git repository.

rouazana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit e1a4ab80a0144bb3cf4abd6ba50ed932ca6fc4d1
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Jul 7 12:38:24 2020 +0700

    JAMES-3265 Flags updates should not infinite loop on denormalization issues
    
    Use table of truth upon retries, this avoids the flags operation to keep failing.
    
    Note that initial operation uses the projection to fasten range reads.
---
 .../cassandra/mail/CassandraMessageMapper.java     |  9 +++-----
 .../mail/utils/FlagsUpdateStageResult.java         | 14 ++++++------
 .../cassandra/mail/CassandraMessageMapperTest.java | 26 ++++++++++++++++++++++
 .../mail/utils/FlagsUpdateStageResultTest.java     | 20 +++++++++++++----
 4 files changed, 52 insertions(+), 17 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 9fbf32c..fb94eff 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -19,8 +19,6 @@
 
 package org.apache.james.mailbox.cassandra.mail;
 
-import static org.apache.james.util.ReactorUtils.publishIfPresent;
-
 import java.time.Duration;
 import java.util.Comparator;
 import java.util.Iterator;
@@ -333,11 +331,10 @@ public class CassandraMessageMapper implements MessageMapper {
         return globalResult;
     }
 
-    private Mono<FlagsUpdateStageResult> retryUpdatesStage(CassandraId mailboxId, FlagsUpdateCalculator flagsUpdateCalculator, List<MessageUid> failed) {
+    private Mono<FlagsUpdateStageResult> retryUpdatesStage(CassandraId mailboxId, FlagsUpdateCalculator flagsUpdateCalculator, List<ComposedMessageId> failed) {
         if (!failed.isEmpty()) {
             Flux<ComposedMessageIdWithMetaData> toUpdate = Flux.fromIterable(failed)
-                .flatMap(uid -> messageIdDAO.retrieve(mailboxId, uid))
-                .handle(publishIfPresent());
+                .flatMap(ids -> imapUidDAO.retrieve((CassandraMessageId) ids.getMessageId(), Optional.of((CassandraId) ids.getMailboxId())));
             return runUpdateStage(mailboxId, toUpdate, flagsUpdateCalculator);
         } else {
             return Mono.empty();
@@ -443,7 +440,7 @@ public class CassandraMessageMapper implements MessageMapper {
                         .newFlags(newFlags)
                         .build());
                 } else {
-                    return FlagsUpdateStageResult.fail(oldMetaData.getComposedMessageId().getUid());
+                    return FlagsUpdateStageResult.fail(oldMetaData.getComposedMessageId());
                 }
             });
     }
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/FlagsUpdateStageResult.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/FlagsUpdateStageResult.java
index df2858f..ba3eafa 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/FlagsUpdateStageResult.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/utils/FlagsUpdateStageResult.java
@@ -22,7 +22,7 @@ package org.apache.james.mailbox.cassandra.mail.utils;
 import java.util.List;
 import java.util.Objects;
 
-import org.apache.james.mailbox.MessageUid;
+import org.apache.james.mailbox.model.ComposedMessageId;
 import org.apache.james.mailbox.model.UpdatedFlags;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -33,24 +33,24 @@ public class FlagsUpdateStageResult {
         return new FlagsUpdateStageResult(ImmutableList.of(), ImmutableList.of(updatedFlags));
     }
 
-    public static FlagsUpdateStageResult fail(MessageUid uid) {
-        return new FlagsUpdateStageResult(ImmutableList.of(uid), ImmutableList.of());
+    public static FlagsUpdateStageResult fail(ComposedMessageId ids) {
+        return new FlagsUpdateStageResult(ImmutableList.of(ids), ImmutableList.of());
     }
 
     public static FlagsUpdateStageResult none() {
         return new FlagsUpdateStageResult(ImmutableList.of(), ImmutableList.of());
     }
 
-    private final ImmutableList<MessageUid> failed;
+    private final ImmutableList<ComposedMessageId> failed;
     private final ImmutableList<UpdatedFlags> succeeded;
 
     @VisibleForTesting
-    FlagsUpdateStageResult(ImmutableList<MessageUid> failed, ImmutableList<UpdatedFlags> succeeded) {
+    FlagsUpdateStageResult(ImmutableList<ComposedMessageId> failed, ImmutableList<UpdatedFlags> succeeded) {
         this.failed = failed;
         this.succeeded = succeeded;
     }
 
-    public List<MessageUid> getFailed() {
+    public List<ComposedMessageId> getFailed() {
         return failed;
     }
 
@@ -60,7 +60,7 @@ public class FlagsUpdateStageResult {
 
     public FlagsUpdateStageResult merge(FlagsUpdateStageResult other) {
         return new FlagsUpdateStageResult(
-            ImmutableList.<MessageUid>builder()
+            ImmutableList.<ComposedMessageId>builder()
                 .addAll(this.failed)
                 .addAll(other.failed)
                 .build(),
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java
index 0e17312..75e9146 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java
@@ -64,6 +64,32 @@ class CassandraMessageMapperTest extends MessageMapperTest {
     @Nested
     class StatementLimitationTests {
         @Test
+        void updateFlagsShouldNotRetryOnDeletedMessages(CassandraCluster cassandra) throws MailboxException {
+            saveMessages();
+
+            cassandra.getConf().printStatements();
+            cassandra.getConf()
+                .registerScenario(fail()
+                    .forever()
+                    .whenQueryStartsWith("DELETE FROM messageIdTable WHERE mailboxId=:mailboxId AND uid=:uid;"));
+            try {
+                messageMapper.deleteMessages(benwaInboxMailbox, ImmutableList.of(message1.getUid(), message2.getUid(), message3.getUid()));
+            } catch (Exception e) {
+                // expected
+            }
+
+            StatementRecorder statementRecorder = new StatementRecorder();
+            cassandra.getConf().recordStatements(statementRecorder);
+
+            FlagsUpdateCalculator markAsRead = new FlagsUpdateCalculator(new Flags(Flags.Flag.SEEN), MessageManager.FlagsUpdateMode.ADD);
+            messageMapper.updateFlags(benwaInboxMailbox, markAsRead, MessageRange.all());
+
+            assertThat(statementRecorder.listExecutedStatements(Selector.preparedStatement(
+                "UPDATE modseq SET nextModseq=:nextModseq WHERE mailboxId=:mailboxId IF nextModseq=:modSeqCondition;")))
+                .hasSize(2);
+        }
+
+        @Test
         void deleteMessagesShouldGroupMessageReads(CassandraCluster cassandra) throws MailboxException {
             saveMessages();
 
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/FlagsUpdateStageResultTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/FlagsUpdateStageResultTest.java
index 3317b6b..923a9a1 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/FlagsUpdateStageResultTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/utils/FlagsUpdateStageResultTest.java
@@ -21,29 +21,41 @@ package org.apache.james.mailbox.cassandra.mail.utils;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
+import java.util.UUID;
+
 import javax.mail.Flags;
 
 import org.apache.james.mailbox.MessageUid;
 import org.apache.james.mailbox.ModSeq;
+import org.apache.james.mailbox.cassandra.ids.CassandraId;
+import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
+import org.apache.james.mailbox.model.ComposedMessageId;
 import org.apache.james.mailbox.model.UpdatedFlags;
 import org.junit.jupiter.api.Test;
 
+import com.datastax.driver.core.utils.UUIDs;
 import com.google.common.collect.ImmutableList;
 
 import nl.jqno.equalsverifier.EqualsVerifier;
 
 class FlagsUpdateStageResultTest {
 
-    private static final MessageUid UID = MessageUid.of(1L);
-    private static final MessageUid OTHER_UID = MessageUid.of(2L);
+    private static final ComposedMessageId UID = new ComposedMessageId(
+        CassandraId.of(UUID.fromString("464765a0-e4e7-11e4-aba4-710c1de3782b")),
+        new CassandraMessageId.Factory().of(UUIDs.timeBased()),
+        MessageUid.of(1L));
+    private static final ComposedMessageId OTHER_UID = new ComposedMessageId(
+        CassandraId.of(UUID.fromString("464765a0-e4e7-11e4-aba4-710c1de3782b")),
+        new CassandraMessageId.Factory().of(UUIDs.timeBased()),
+        MessageUid.of(2L));
     private static final UpdatedFlags UPDATED_FLAGS = UpdatedFlags.builder()
-        .uid(UID)
+        .uid(UID.getUid())
         .modSeq(ModSeq.of(18))
         .oldFlags(new Flags())
         .newFlags(new Flags(Flags.Flag.SEEN))
         .build();
     private static final UpdatedFlags OTHER_UPDATED_FLAGS = UpdatedFlags.builder()
-        .uid(OTHER_UID)
+        .uid(OTHER_UID.getUid())
         .modSeq(ModSeq.of(18))
         .oldFlags(new Flags())
         .newFlags(new Flags(Flags.Flag.SEEN))


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org