You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2020/10/16 03:54:33 UTC
[james-project] 01/06: JAMES-3428 Implement read-repairs for
mailbox counters
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 363801128637e23bd85ce2c6e105145ee405c0f5
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Oct 14 13:46:35 2020 +0700
JAMES-3428 Implement read-repairs for mailbox counters
---
.../james/mailbox/model/MailboxCounters.java | 2 +-
.../CassandraMailboxSessionMapperFactory.java | 6 ++-
.../cassandra/mail/CassandraMessageMapper.java | 43 ++++++++++++++-
.../mail/task/RecomputeMailboxCountersService.java | 6 +--
.../CassandraSubscriptionManagerTest.java | 3 ++
.../cassandra/mail/CassandraMessageMapperTest.java | 62 ++++++++++++++++++++++
6 files changed, 116 insertions(+), 6 deletions(-)
diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/model/MailboxCounters.java b/mailbox/api/src/main/java/org/apache/james/mailbox/model/MailboxCounters.java
index 3cceed5..854264d 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/model/MailboxCounters.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/model/MailboxCounters.java
@@ -113,7 +113,7 @@ public class MailboxCounters {
return Sanitized.of(mailboxId, sanitizedCount, sanitizedUnseen);
}
- private boolean isValid() {
+ public boolean isValid() {
return count >= 0
&& unseen >= 0
&& count >= unseen;
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
index 3cd91fd..f0e5609 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
@@ -51,6 +51,7 @@ import org.apache.james.mailbox.cassandra.mail.CassandraMessageMapper;
import org.apache.james.mailbox.cassandra.mail.CassandraModSeqProvider;
import org.apache.james.mailbox.cassandra.mail.CassandraUidProvider;
import org.apache.james.mailbox.cassandra.mail.CassandraUserMailboxRightsDAO;
+import org.apache.james.mailbox.cassandra.mail.task.RecomputeMailboxCountersService;
import org.apache.james.mailbox.cassandra.user.CassandraSubscriptionMapper;
import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
import org.apache.james.mailbox.store.mail.AnnotationMapper;
@@ -94,6 +95,7 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa
private final CassandraUserMailboxRightsDAO userMailboxRightsDAO;
private final CassandraSchemaVersionManager versionManager;
private final CassandraUtils cassandraUtils;
+ private final RecomputeMailboxCountersService recomputeMailboxCountersService;
private final CassandraConfiguration cassandraConfiguration;
@Inject
@@ -107,6 +109,7 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa
CassandraAttachmentOwnerDAO ownerDAO, CassandraACLMapper aclMapper,
CassandraUserMailboxRightsDAO userMailboxRightsDAO,
CassandraSchemaVersionManager versionManager,
+ RecomputeMailboxCountersService recomputeMailboxCountersService,
CassandraUtils cassandraUtils, CassandraConfiguration cassandraConfiguration) {
this.uidProvider = uidProvider;
this.modSeqProvider = modSeqProvider;
@@ -131,6 +134,7 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa
this.versionManager = versionManager;
this.cassandraUtils = cassandraUtils;
this.ownerDAO = ownerDAO;
+ this.recomputeMailboxCountersService = recomputeMailboxCountersService;
this.cassandraConfiguration = cassandraConfiguration;
this.indexTableHandler = new CassandraIndexTableHandler(
mailboxRecentsDAO,
@@ -155,7 +159,7 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa
indexTableHandler,
firstUnseenDAO,
deletedMessageDAO,
- cassandraConfiguration);
+ cassandraConfiguration, recomputeMailboxCountersService);
}
@Override
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 851db3d..b72d56d 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -19,6 +19,7 @@
package org.apache.james.mailbox.cassandra.mail;
+import java.security.SecureRandom;
import java.time.Duration;
import java.util.Comparator;
import java.util.Iterator;
@@ -39,6 +40,7 @@ import org.apache.james.mailbox.MessageUid;
import org.apache.james.mailbox.ModSeq;
import org.apache.james.mailbox.cassandra.ids.CassandraId;
import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
+import org.apache.james.mailbox.cassandra.mail.task.RecomputeMailboxCountersService;
import org.apache.james.mailbox.cassandra.mail.utils.FlagsUpdateStageResult;
import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.model.ComposedMessageId;
@@ -52,6 +54,7 @@ import org.apache.james.mailbox.store.FlagsUpdateCalculator;
import org.apache.james.mailbox.store.mail.MessageMapper;
import org.apache.james.mailbox.store.mail.model.MailboxMessage;
import org.apache.james.mailbox.store.mail.model.impl.SimpleMailboxMessage;
+import org.apache.james.task.Task;
import org.apache.james.util.ReactorUtils;
import org.apache.james.util.streams.Limit;
import org.slf4j.Logger;
@@ -85,6 +88,8 @@ public class CassandraMessageMapper implements MessageMapper {
private final AttachmentLoader attachmentLoader;
private final CassandraDeletedMessageDAO deletedMessageDAO;
private final CassandraConfiguration cassandraConfiguration;
+ private final RecomputeMailboxCountersService recomputeMailboxCountersService;
+ private final SecureRandom secureRandom;
public CassandraMessageMapper(CassandraUidProvider uidProvider, CassandraModSeqProvider modSeqProvider,
CassandraAttachmentMapper attachmentMapper,
@@ -92,7 +97,8 @@ public class CassandraMessageMapper implements MessageMapper {
CassandraMessageIdToImapUidDAO imapUidDAO, CassandraMailboxCounterDAO mailboxCounterDAO,
CassandraMailboxRecentsDAO mailboxRecentDAO, CassandraApplicableFlagDAO applicableFlagDAO,
CassandraIndexTableHandler indexTableHandler, CassandraFirstUnseenDAO firstUnseenDAO,
- CassandraDeletedMessageDAO deletedMessageDAO, CassandraConfiguration cassandraConfiguration) {
+ CassandraDeletedMessageDAO deletedMessageDAO, CassandraConfiguration cassandraConfiguration,
+ RecomputeMailboxCountersService recomputeMailboxCountersService) {
this.uidProvider = uidProvider;
this.modSeqProvider = modSeqProvider;
this.messageDAO = messageDAO;
@@ -106,6 +112,8 @@ public class CassandraMessageMapper implements MessageMapper {
this.applicableFlagDAO = applicableFlagDAO;
this.deletedMessageDAO = deletedMessageDAO;
this.cassandraConfiguration = cassandraConfiguration;
+ this.recomputeMailboxCountersService = recomputeMailboxCountersService;
+ this.secureRandom = new SecureRandom();
}
@Override
@@ -128,6 +136,18 @@ public class CassandraMessageMapper implements MessageMapper {
@Override
public Mono<MailboxCounters> getMailboxCountersReactive(Mailbox mailbox) {
CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
+ return readMailboxCounters(mailboxId)
+ .flatMap(counters -> {
+ if (!counters.isValid()) {
+ return fixCounters(mailbox)
+ .then(readMailboxCounters(mailboxId));
+ }
+ return Mono.just(counters);
+ })
+ .doOnNext(counters -> readRepair(mailbox, counters));
+ }
+
+ public Mono<MailboxCounters> readMailboxCounters(CassandraId mailboxId) {
return mailboxCounterDAO.retrieveMailboxCounters(mailboxId)
.defaultIfEmpty(MailboxCounters.builder()
.mailboxId(mailboxId)
@@ -136,6 +156,27 @@ public class CassandraMessageMapper implements MessageMapper {
.build());
}
+ private void readRepair(Mailbox mailbox, MailboxCounters counters) {
+ if (shouldReadRepair(counters)) {
+ fixCounters(mailbox)
+ .subscribeOn(Schedulers.elastic())
+ .subscribe();
+ }
+ }
+
+ private Mono<Task.Result> fixCounters(Mailbox mailbox) {
+ return recomputeMailboxCountersService.recomputeMailboxCounter(
+ new RecomputeMailboxCountersService.Context(),
+ mailbox,
+ RecomputeMailboxCountersService.Options.trustMessageProjection());
+ }
+
+ private boolean shouldReadRepair(MailboxCounters counters) {
+ double readRepairChance = 0.1;
+ double ponderatedReadRepairChance = readRepairChance * (10.0 / counters.getUnseen());
+ return secureRandom.nextFloat() < Math.min(readRepairChance, ponderatedReadRepairChance);
+ }
+
@Override
public void delete(Mailbox mailbox, MailboxMessage message) {
ComposedMessageIdWithMetaData metaData = message.getComposedMessageIdWithMetaData();
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/RecomputeMailboxCountersService.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/RecomputeMailboxCountersService.java
index be1fbec..18716c5 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/RecomputeMailboxCountersService.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/RecomputeMailboxCountersService.java
@@ -122,7 +122,7 @@ public class RecomputeMailboxCountersService {
}
}
- static class Context {
+ public static class Context {
static class Snapshot {
private final long processedMailboxCount;
private final ImmutableList<CassandraId> failedMailboxes;
@@ -168,7 +168,7 @@ public class RecomputeMailboxCountersService {
private final AtomicLong processedMailboxCount;
private final ConcurrentLinkedDeque<CassandraId> failedMailboxes;
- Context() {
+ public Context() {
processedMailboxCount = new AtomicLong();
failedMailboxes = new ConcurrentLinkedDeque<>();
}
@@ -213,7 +213,7 @@ public class RecomputeMailboxCountersService {
});
}
- private Mono<Result> recomputeMailboxCounter(Context context, Mailbox mailbox, Options options) {
+ public Mono<Result> recomputeMailboxCounter(Context context, Mailbox mailbox, Options options) {
CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
Counter counter = new Counter(mailboxId);
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraSubscriptionManagerTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraSubscriptionManagerTest.java
index 781ef96..9919308 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraSubscriptionManagerTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/CassandraSubscriptionManagerTest.java
@@ -45,6 +45,7 @@ import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO;
import org.apache.james.mailbox.cassandra.mail.CassandraModSeqProvider;
import org.apache.james.mailbox.cassandra.mail.CassandraUidProvider;
import org.apache.james.mailbox.cassandra.mail.CassandraUserMailboxRightsDAO;
+import org.apache.james.mailbox.cassandra.mail.task.RecomputeMailboxCountersService;
import org.apache.james.mailbox.cassandra.modules.CassandraSubscriptionModule;
import org.apache.james.mailbox.store.StoreSubscriptionManager;
import org.junit.jupiter.api.BeforeEach;
@@ -88,6 +89,7 @@ class CassandraSubscriptionManagerTest implements SubscriptionManagerContract {
CassandraUidProvider uidProvider = null;
CassandraModSeqProvider modSeqProvider = null;
CassandraSchemaVersionManager versionManager = null;
+ RecomputeMailboxCountersService recomputeMailboxCountersService = null;
subscriptionManager = new StoreSubscriptionManager(
new CassandraMailboxSessionMapperFactory(
@@ -113,6 +115,7 @@ class CassandraSubscriptionManagerTest implements SubscriptionManagerContract {
aclMapper,
userMailboxRightsDAO,
versionManager,
+ recomputeMailboxCountersService,
CassandraUtils.WITH_DEFAULT_CONFIGURATION,
CassandraConfiguration.DEFAULT_CONFIGURATION));
}
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java
index aab3f51..20de2e5 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapperTest.java
@@ -19,6 +19,7 @@
package org.apache.james.mailbox.cassandra.mail;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.james.backends.cassandra.Scenario.Builder.fail;
import static org.assertj.core.api.Assertions.assertThat;
@@ -43,6 +44,8 @@ import org.apache.james.mailbox.store.mail.model.MapperProvider;
import org.apache.james.mailbox.store.mail.model.MessageMapperTest;
import org.apache.james.util.streams.Limit;
import org.assertj.core.api.SoftAssertions;
+import org.awaitility.Awaitility;
+import org.awaitility.Duration;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -353,4 +356,63 @@ class CassandraMessageMapperTest extends MessageMapperTest {
.hasSize(1);
}
}
+
+ @Nested
+ class ReadRepairsTesting {
+ @Test
+ void readingShouldEventuallyFixCountersInconsistencies(CassandraCluster cassandra) throws MailboxException {
+ saveMessages();
+ FlagsUpdateCalculator newFlags = new FlagsUpdateCalculator(new Flags(Flags.Flag.SEEN), MessageManager.FlagsUpdateMode.REPLACE);
+ messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(), newFlags);
+ // Expected count of unseen is 4 see MessageMapperTest::mailboxUnSeenCountShouldBeDecrementedAfterAMessageIsMarkedSeen
+
+ // Create an inconsistency
+ new CassandraMailboxCounterDAO(cassandra.getConf())
+ .incrementUnseenAndCount((CassandraId) benwaInboxMailbox.getMailboxId())
+ .block();
+
+ // 100 poll with a 0.1 probability to trigger read repair
+ Awaitility.await()
+ .pollInterval(new Duration(10, MILLISECONDS))
+ .atMost(Duration.ONE_SECOND)
+ .untilAsserted(() ->
+ assertThat(messageMapper.getMailboxCounters(benwaInboxMailbox).getUnseen()).isEqualTo(4));
+ }
+
+ @Test
+ void readingShouldEventuallyFixMissingCountersInconsistencies(CassandraCluster cassandra) throws MailboxException {
+ saveMessages();
+ FlagsUpdateCalculator newFlags = new FlagsUpdateCalculator(new Flags(Flags.Flag.SEEN), MessageManager.FlagsUpdateMode.REPLACE);
+ messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(), newFlags);
+ // Expected count of unseen is 4 see MessageMapperTest::mailboxUnSeenCountShouldBeDecrementedAfterAMessageIsMarkedSeen
+
+ // Create an inconsistency
+ new CassandraMailboxCounterDAO(cassandra.getConf())
+ .delete((CassandraId) benwaInboxMailbox.getMailboxId())
+ .block();
+
+ // 100 poll with a 0.1 probability to trigger read repair
+ Awaitility.await()
+ .pollInterval(new Duration(10, MILLISECONDS))
+ .atMost(Duration.ONE_SECOND)
+ .untilAsserted(() ->
+ assertThat(messageMapper.getMailboxCounters(benwaInboxMailbox).getUnseen()).isEqualTo(4));
+ }
+
+ @Test
+ void readingShouldFixInvalidCounters(CassandraCluster cassandra) throws MailboxException {
+ saveMessages();
+ FlagsUpdateCalculator newFlags = new FlagsUpdateCalculator(new Flags(Flags.Flag.SEEN), MessageManager.FlagsUpdateMode.REPLACE);
+ messageMapper.updateFlags(benwaInboxMailbox, message1.getUid(), newFlags);
+ // Expected count of unseen is 4 see MessageMapperTest::mailboxUnSeenCountShouldBeDecrementedAfterAMessageIsMarkedSeen
+
+ // Create an inconsistency
+ new CassandraMailboxCounterDAO(cassandra.getConf())
+ .incrementUnseen((CassandraId) benwaInboxMailbox.getMailboxId())
+ .repeat(5)
+ .blockLast();
+
+ assertThat(messageMapper.getMailboxCounters(benwaInboxMailbox).getUnseen()).isEqualTo(4);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org