You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2020/10/16 03:54:33 UTC

[james-project] 01/06: JAMES-3428 Implement read-repairs for mailbox counters

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 363801128637e23bd85ce2c6e105145ee405c0f5
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Oct 14 13:46:35 2020 +0700

    JAMES-3428 Implement read-repairs for mailbox counters
---
 .../james/mailbox/model/MailboxCounters.java       |  2 +-
 .../CassandraMailboxSessionMapperFactory.java      |  6 ++-
 .../cassandra/mail/CassandraMessageMapper.java     | 43 ++++++++++++++-
 .../mail/task/RecomputeMailboxCountersService.java |  6 +--
 .../CassandraSubscriptionManagerTest.java          |  3 ++
 .../cassandra/mail/CassandraMessageMapperTest.java | 62 ++++++++++++++++++++++
 6 files changed, 116 insertions(+), 6 deletions(-)

diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/model/MailboxCounters.java b/mailbox/api/src/main/java/org/apache/james/mailbox/model/MailboxCounters.java
index 3cceed5..854264d 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/model/MailboxCounters.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/model/MailboxCounters.java
@@ -113,7 +113,7 @@ public class MailboxCounters {
         return Sanitized.of(mailboxId, sanitizedCount, sanitizedUnseen);
     }
 
-    private boolean isValid() {
+    public boolean isValid() {
         return count >= 0
             && unseen >= 0
             && count >= unseen;
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
index 3cd91fd..f0e5609 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
@@ -51,6 +51,7 @@ import org.apache.james.mailbox.cassandra.mail.CassandraMessageMapper;
 import org.apache.james.mailbox.cassandra.mail.CassandraModSeqProvider;
 import org.apache.james.mailbox.cassandra.mail.CassandraUidProvider;
 import org.apache.james.mailbox.cassandra.mail.CassandraUserMailboxRightsDAO;
+import org.apache.james.mailbox.cassandra.mail.task.RecomputeMailboxCountersService;
 import org.apache.james.mailbox.cassandra.user.CassandraSubscriptionMapper;
 import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
 import org.apache.james.mailbox.store.mail.AnnotationMapper;
@@ -94,6 +95,7 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa
     private final CassandraUserMailboxRightsDAO userMailboxRightsDAO;
     private final CassandraSchemaVersionManager versionManager;
     private final CassandraUtils cassandraUtils;
+    private final RecomputeMailboxCountersService recomputeMailboxCountersService;
     private final CassandraConfiguration cassandraConfiguration;
 
     @Inject
@@ -107,6 +109,7 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa
                                                 CassandraAttachmentOwnerDAO ownerDAO, CassandraACLMapper aclMapper,
                                                 CassandraUserMailboxRightsDAO userMailboxRightsDAO,
                                                 CassandraSchemaVersionManager versionManager,
+                                                RecomputeMailboxCountersService recomputeMailboxCountersService,
                                                 CassandraUtils cassandraUtils, CassandraConfiguration cassandraConfiguration) {
         this.uidProvider = uidProvider;
         this.modSeqProvider = modSeqProvider;
@@ -131,6 +134,7 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa
         this.versionManager = versionManager;
         this.cassandraUtils = cassandraUtils;
         this.ownerDAO = ownerDAO;
+        this.recomputeMailboxCountersService = recomputeMailboxCountersService;
         this.cassandraConfiguration = cassandraConfiguration;
         this.indexTableHandler = new CassandraIndexTableHandler(
             mailboxRecentsDAO,
@@ -155,7 +159,7 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa
                                           indexTableHandler,
                                           firstUnseenDAO,
                                           deletedMessageDAO,
-                                          cassandraConfiguration);
+                                          cassandraConfiguration, recomputeMailboxCountersService);
     }
 
     @Override
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 851db3d..b72d56d 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.mailbox.cassandra.mail;
 
+import java.security.SecureRandom;
 import java.time.Duration;
 import java.util.Comparator;
 import java.util.Iterator;
@@ -39,6 +40,7 @@ import org.apache.james.mailbox.MessageUid;
 import org.apache.james.mailbox.ModSeq;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
 import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
+import org.apache.james.mailbox.cassandra.mail.task.RecomputeMailboxCountersService;
 import org.apache.james.mailbox.cassandra.mail.utils.FlagsUpdateStageResult;
 import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.ComposedMessageId;
@@ -52,6 +54,7 @@ import org.apache.james.mailbox.store.FlagsUpdateCalculator;
 import org.apache.james.mailbox.store.mail.MessageMapper;
 import org.apache.james.mailbox.store.mail.model.MailboxMessage;
 import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
+import org.apache.james.task.Task;
 import org.apache.james.util.ReactorUtils;
 import org.apache.james.util.streams.Limit;
 import org.slf4j.Logger;
@@ -85,6 +88,8 @@ public class CassandraMessageMapper implements MessageMapper {
     private final AttachmentLoader attachmentLoader;
     private final CassandraDeletedMessageDAO deletedMessageDAO;
     private final CassandraConfiguration cassandraConfiguration;
+    private final RecomputeMailboxCountersService recomputeMailboxCountersService;
+    private final SecureRandom secureRandom;
 
     public CassandraMessageMapper(CassandraUidProvider uidProvider, CassandraModSeqProvider modSeqProvider,
                                   CassandraAttachmentMapper attachmentMapper,
@@ -92,7 +97,8 @@ public class CassandraMessageMapper implements MessageMapper {
                                   CassandraMessageIdToImapUidDAO imapUidDAO, CassandraMailboxCounterDAO mailboxCounterDAO,
                                   CassandraMailboxRecentsDAO mailboxRecentDAO, CassandraApplicableFlagDAO applicableFlagDAO,
                                   CassandraIndexTableHandler indexTableHandler, CassandraFirstUnseenDAO firstUnseenDAO,
-                                  CassandraDeletedMessageDAO deletedMessageDAO, CassandraConfiguration cassandraConfiguration) {
+                                  CassandraDeletedMessageDAO deletedMessageDAO, CassandraConfiguration cassandraConfiguration,
+                                  RecomputeMailboxCountersService recomputeMailboxCountersService) {
         this.uidProvider = uidProvider;
         this.modSeqProvider = modSeqProvider;
         this.messageDAO = messageDAO;
@@ -106,6 +112,8 @@ public class CassandraMessageMapper implements MessageMapper {
         this.applicableFlagDAO = applicableFlagDAO;
         this.deletedMessageDAO = deletedMessageDAO;
         this.cassandraConfiguration = cassandraConfiguration;
+        this.recomputeMailboxCountersService = recomputeMailboxCountersService;
+        this.secureRandom = new SecureRandom();
     }
 
     @Override
@@ -128,6 +136,18 @@ public class CassandraMessageMapper implements MessageMapper {
     @Override
     public Mono<MailboxCounters> getMailboxCountersReactive(Mailbox mailbox) {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
+        return readMailboxCounters(mailboxId)
+            .flatMap(counters -> {
+                if (!counters.isValid()) {
+                    return fixCounters(mailbox)
+                        .then(readMailboxCounters(mailboxId));
+                }
+                return Mono.just(counters);
+            })
+            .doOnNext(counters -> readRepair(mailbox, counters));
+    }
+
+    public Mono<MailboxCounters> readMailboxCounters(CassandraId mailboxId) {
         return mailboxCounterDAO.retrieveMailboxCounters(mailboxId)
             .defaultIfEmpty(MailboxCounters.builder()
                 .mailboxId(mailboxId)
@@ -136,6 +156,27 @@ public class CassandraMessageMapper implements MessageMapper {
                 .build());
     }
 
+    private void readRepair(Mailbox mailbox, MailboxCounters counters) {
+        if (shouldReadRepair(counters)) {
+            fixCounters(mailbox)
+                .subscribeOn(Schedulers.elastic())
+                .subscribe();
+        }
+    }
+
+    private Mono<Task.Result> fixCounters(Mailbox mailbox) {
+        return recomputeMailboxCountersService.recomputeMailboxCounter(
+                new RecomputeMailboxCountersService.Context(),
+                mailbox,
+                RecomputeMailboxCountersService.Options.trustMessageProjection());
+    }
+
+    private boolean shouldReadRepair(MailboxCounters counters) {
+        double readRepairChance = 0.1;
+        double ponderatedReadRepairChance = readRepairChance * (10.0 / counters.getUnseen());
+        return secureRandom.nextFloat() < Math.min(readRepairChance, ponderatedReadRepairChance);
+    }
+
     @Override
     public void delete(Mailbox mailbox, MailboxMessage message) {
         ComposedMessageIdWithMetaData metaData = message.getComposedMessageIdWithMetaData();
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/RecomputeMailboxCountersService.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/RecomputeMailboxCountersService.java
index be1fbec..18716c5 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/RecomputeMailboxCountersService.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/RecomputeMailboxCountersService.java
@@ -122,7 +122,7 @@ public class RecomputeMailboxCountersService {
         }
     }
 
-    static class Context {
+    public static class Context {
         static class Snapshot {
             private final long processedMailboxCount;
             private final ImmutableList<CassandraId> failedMailboxes;
@@ -168,7 +168,7 @@ public class RecomputeMailboxCountersService {
         private final AtomicLong processedMailboxCount;
         private final ConcurrentLinkedDeque<CassandraId> failedMailboxes;
 
-        Context() {
+        public Context() {
             processedMailboxCount = new AtomicLong();
             failedMailboxes = new ConcurrentLinkedDeque<>();
         }
@@ -213,7 +213,7 @@ public class RecomputeMailboxCountersService {
             });
     }
 
-    private Mono<Result> recomputeMailboxCounter(Context context, Mailbox mailbox, Options options) {
+    public Mono<Result> recomputeMailboxCounter(Context context, Mailbox mailbox, Options options) {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
         Counter counter = new Counter(mailboxId);
 
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraSubscriptionManagerTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraSubscriptionManagerTest.java
index 781ef96..9919308 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraSubscriptionManagerTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraSubscriptionManagerTest.java
@@ -45,6 +45,7 @@ import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO;
 import org.apache.james.mailbox.cassandra.mail.CassandraModSeqProvider;
 import org.apache.james.mailbox.cassandra.mail.CassandraUidProvider;
 import org.apache.james.mailbox.cassandra.mail.CassandraUserMailboxRightsDAO;
+import org.apache.james.mailbox.cassandra.mail.task.RecomputeMailboxCountersService;
 import org.apache.james.mailbox.cassandra.modules.CassandraSubscriptionModule;
 import org.apache.james.mailbox.store.StoreSubscriptionManager;
 import org.junit.jupiter.api.BeforeEach;
@@ -88,6 +89,7 @@ class CassandraSubscriptionManagerTest implements SubscriptionManagerContract {
         CassandraUidProvider uidProvider = null;
         CassandraModSeqProvider modSeqProvider = null;
         CassandraSchemaVersionManager versionManager = null;
+        RecomputeMailboxCountersService recomputeMailboxCountersService = null;
 
         subscriptionManager = new StoreSubscriptionManager(
             new CassandraMailboxSessionMapperFactory(
@@ -113,6 +115,7 @@ class CassandraSubscriptionManagerTest implements SubscriptionManagerContract {
                 aclMapper,
                 userMailboxRightsDAO,
                 versionManager,
+                recomputeMailboxCountersService,
                 CassandraUtils.WITH_DEFAULT_CONFIGURATION,
                 CassandraConfiguration.DEFAULT_CONFIGURATION));
     }
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java
index aab3f51..20de2e5 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.mailbox.cassandra.mail;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.james.backends.cassandra.Scenario.Builder.fail;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -43,6 +44,8 @@ import org.apache.james.mailbox.store.mail.model.MapperProvider;
 import org.apache.james.mailbox.store.mail.model.MessageMapperTest;
 import org.apache.james.util.streams.Limit;
 import org.assertj.core.api.SoftAssertions;
+import org.awaitility.Awaitility;
+import org.awaitility.Duration;
 import org.junit.jupiter.api.Nested;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
@@ -353,4 +356,63 @@ class CassandraMessageMapperTest extends MessageMapperTest {
                 .hasSize(1);
         }
     }
+
+    @Nested
+    class ReadRepairsTesting {
+        @Test
+        void readingShouldEventuallyFixCountersInconsistencies(CassandraCluster cassandra) throws MailboxException {
+            saveMessages();
+            FlagsUpdateCalculator newFlags = new FlagsUpdateCalculator(new Flags(Flags.Flag.SEEN), MessageManager.FlagsUpdateMode.REPLACE);
+            messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(), newFlags);
+            // Expected count of unseen is 4 see MessageMapperTest::mailboxUnSeenCountShouldBeDecrementedAfterAMessageIsMarkedSeen
+
+            // Create an inconsistency
+            new CassandraMailboxCounterDAO(cassandra.getConf())
+                .incrementUnseenAndCount((CassandraId) benwaInboxMailbox.getMailboxId())
+                .block();
+
+            // 100 poll with a 0.1 probability to trigger read repair
+            Awaitility.await()
+                .pollInterval(new Duration(10, MILLISECONDS))
+                .atMost(Duration.ONE_SECOND)
+                .untilAsserted(() ->
+                    assertThat(messageMapper.getMailboxCounters(benwaInboxMailbox).getUnseen()).isEqualTo(4));
+        }
+
+        @Test
+        void readingShouldEventuallyFixMissingCountersInconsistencies(CassandraCluster cassandra) throws MailboxException {
+            saveMessages();
+            FlagsUpdateCalculator newFlags = new FlagsUpdateCalculator(new Flags(Flags.Flag.SEEN), MessageManager.FlagsUpdateMode.REPLACE);
+            messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(), newFlags);
+            // Expected count of unseen is 4 see MessageMapperTest::mailboxUnSeenCountShouldBeDecrementedAfterAMessageIsMarkedSeen
+
+            // Create an inconsistency
+            new CassandraMailboxCounterDAO(cassandra.getConf())
+                .delete((CassandraId) benwaInboxMailbox.getMailboxId())
+                .block();
+
+            // 100 poll with a 0.1 probability to trigger read repair
+            Awaitility.await()
+                .pollInterval(new Duration(10, MILLISECONDS))
+                .atMost(Duration.ONE_SECOND)
+                .untilAsserted(() ->
+                    assertThat(messageMapper.getMailboxCounters(benwaInboxMailbox).getUnseen()).isEqualTo(4));
+        }
+
+        @Test
+        void readingShouldFixInvalidCounters(CassandraCluster cassandra) throws MailboxException {
+            saveMessages();
+            FlagsUpdateCalculator newFlags = new FlagsUpdateCalculator(new Flags(Flags.Flag.SEEN), MessageManager.FlagsUpdateMode.REPLACE);
+            messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(), newFlags);
+            // Expected count of unseen is 4 see MessageMapperTest::mailboxUnSeenCountShouldBeDecrementedAfterAMessageIsMarkedSeen
+
+            // Create an inconsistency
+            new CassandraMailboxCounterDAO(cassandra.getConf())
+                .incrementUnseen((CassandraId) benwaInboxMailbox.getMailboxId())
+                .repeat(5)
+                .blockLast();
+
+            assertThat(messageMapper.getMailboxCounters(benwaInboxMailbox).getUnseen()).isEqualTo(4);
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org