You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2020/12/18 07:27:35 UTC

[james-project] 07/13: JAMES-3435 Make message read isolation configurable

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 65f0156378881ac43022fd13ebb2991ee6d37994
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu Dec 10 09:24:38 2020 +0700

    JAMES-3435 Make message read isolation configurable
    
    Disabling should be considered experimental.
    
    If enabled, regular consistency level is used for
    read transactions for message. This might result
    in stale reads as the system.paxos table will not
    be checked for latest updates. Better performance
    are expected by turning it off.
    
    Note that reads performed as part of write transactions
    are always performed with a strong consistency.
---
 .../init/configuration/CassandraConfiguration.java | 31 +++++++++++++++++++---
 .../pages/distributed/configure/cassandra.adoc     |  6 +++++
 .../mailbox/cassandra/DeleteMessageListener.java   |  4 +--
 .../cassandra/mail/CassandraMessageIdMapper.java   | 22 +++++++++++----
 .../mail/CassandraMessageIdToImapUidDAO.java       | 30 ++++++++++++++++++---
 .../cassandra/mail/CassandraMessageMapper.java     |  4 ++-
 .../mail/task/RecomputeMailboxCountersService.java |  4 ++-
 .../task/SolveMessageInconsistenciesService.java   |  7 ++---
 src/site/xdoc/server/config-cassandra.xml          |  7 +++++
 9 files changed, 96 insertions(+), 19 deletions(-)

diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/configuration/CassandraConfiguration.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/configuration/CassandraConfiguration.java
index 99ac1a7..ad46dd9 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/configuration/CassandraConfiguration.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/configuration/CassandraConfiguration.java
@@ -57,6 +57,7 @@ public class CassandraConfiguration {
     public static final String DEFAULT_CONSISTENCY_LEVEL_LIGHTWEIGHT_TRANSACTION = "SERIAL";
     public static final List<String> VALID_CONSISTENCY_LEVEL_REGULAR = ImmutableList.of("QUORUM", "LOCAL_QUORUM", "EACH_QUORUM");
     public static final List<String> VALID_CONSISTENCY_LEVEL_LIGHTWEIGHT_TRANSACTION = ImmutableList.of("SERIAL", "LOCAL_SERIAL");
+    public static final boolean DEFAULT_STRONG_CONSISTENCY = true;
 
     private static final String MAILBOX_READ_REPAIR = "mailbox.read.repair.chance";
     private static final String MAILBOX_MAX_COUNTERS_READ_REPAIR = "mailbox.counters.read.repair.chance.max";
@@ -72,6 +73,7 @@ public class CassandraConfiguration {
     private static final String BLOB_PART_SIZE = "mailbox.blob.part.size";
     private static final String ATTACHMENT_V2_MIGRATION_READ_TIMEOUT = "attachment.v2.migration.read.timeout";
     private static final String MESSAGE_ATTACHMENTID_READ_TIMEOUT = "message.attachmentids.read.timeout";
+    private static final String MESSAGE_READ_STRONG_CONSISTENCY = "message.read.strong.consistency";
     private static final String CONSISTENCY_LEVEL_REGULAR = "cassandra.consistency_level.regular";
     private static final String CONSISTENCY_LEVEL_LIGHTWEIGHT_TRANSACTION = "cassandra.consistency_level.lightweight_transaction";
 
@@ -94,6 +96,17 @@ public class CassandraConfiguration {
         private Optional<Float> mailboxReadRepair = Optional.empty();
         private Optional<Float> mailboxCountersReadRepairMax = Optional.empty();
         private Optional<Float> mailboxCountersReadRepairChanceOneHundred = Optional.empty();
+        private Optional<Boolean> messageReadStrongConsistency = Optional.empty();
+
+        public Builder messageReadStrongConsistency(boolean value) {
+            this.messageReadStrongConsistency = Optional.of(value);
+            return this;
+        }
+
+        public Builder messageReadStrongConsistency(Optional<Boolean> value) {
+            this.messageReadStrongConsistency = value;
+            return this;
+        }
 
         public Builder messageReadChunkSize(int value) {
             Preconditions.checkArgument(value > 0, "messageReadChunkSize needs to be strictly positive");
@@ -300,7 +313,8 @@ public class CassandraConfiguration {
                 consistencyLevelLightweightTransaction,
                 mailboxReadRepair.orElse(DEFAULT_MAILBOX_READ_REPAIR),
                 mailboxCountersReadRepairMax.orElse(DEFAULT_MAX_MAILBOX_COUNTERS_READ_REPAIR_CHANCE),
-                mailboxCountersReadRepairChanceOneHundred.orElse(DEFAULT_ONE_HUNDRED_MAILBOX_COUNTERS_READ_REPAIR_CHANCE));
+                mailboxCountersReadRepairChanceOneHundred.orElse(DEFAULT_ONE_HUNDRED_MAILBOX_COUNTERS_READ_REPAIR_CHANCE),
+                messageReadStrongConsistency.orElse(DEFAULT_STRONG_CONSISTENCY));
         }
     }
 
@@ -342,6 +356,8 @@ public class CassandraConfiguration {
                 propertiesConfiguration.getFloat(MAILBOX_MAX_COUNTERS_READ_REPAIR, null)))
             .mailboxCountersReadRepairChanceOneHundred(Optional.ofNullable(
                 propertiesConfiguration.getFloat(MAILBOX_ONE_HUNDRED_COUNTERS_READ_REPAIR, null)))
+            .messageReadStrongConsistency(Optional.ofNullable(
+                propertiesConfiguration.getBoolean(MESSAGE_READ_STRONG_CONSISTENCY, null)))
             .build();
     }
 
@@ -361,6 +377,7 @@ public class CassandraConfiguration {
     private final float mailboxReadRepair;
     private final float mailboxCountersReadRepairChanceMax;
     private final float mailboxCountersReadRepairChanceOneHundred;
+    private final boolean messageReadStrongConsistency;
 
     @VisibleForTesting
     CassandraConfiguration(int aclMaxRetry, int messageReadChunkSize, int expungeChunkSize,
@@ -369,7 +386,7 @@ public class CassandraConfiguration {
                            int blobPartSize, final int attachmentV2MigrationReadTimeout, int messageAttachmentIdsReadTimeout,
                            String consistencyLevelRegular, String consistencyLevelLightweightTransaction,
                            float mailboxReadRepair, float mailboxCountersReadRepairChanceMax,
-                           float mailboxCountersReadRepairChanceOneHundred) {
+                           float mailboxCountersReadRepairChanceOneHundred, boolean messageReadStrongConsistency) {
         this.aclMaxRetry = aclMaxRetry;
         this.messageReadChunkSize = messageReadChunkSize;
         this.expungeChunkSize = expungeChunkSize;
@@ -386,6 +403,11 @@ public class CassandraConfiguration {
         this.mailboxReadRepair = mailboxReadRepair;
         this.mailboxCountersReadRepairChanceMax = mailboxCountersReadRepairChanceMax;
         this.mailboxCountersReadRepairChanceOneHundred = mailboxCountersReadRepairChanceOneHundred;
+        this.messageReadStrongConsistency = messageReadStrongConsistency;
+    }
+
+    public boolean isMessageReadStrongConsistency() {
+        return messageReadStrongConsistency;
     }
 
     public float getMailboxReadRepair() {
@@ -471,6 +493,7 @@ public class CassandraConfiguration {
                 && Objects.equals(this.blobPartSize, that.blobPartSize)
                 && Objects.equals(this.attachmentV2MigrationReadTimeout, that.attachmentV2MigrationReadTimeout)
                 && Objects.equals(this.messageAttachmentIdsReadTimeout, that.messageAttachmentIdsReadTimeout)
+                && Objects.equals(this.messageReadStrongConsistency, that.messageReadStrongConsistency)
                 && Objects.equals(this.consistencyLevelRegular, that.consistencyLevelRegular)
                 && Objects.equals(this.consistencyLevelLightweightTransaction, that.consistencyLevelLightweightTransaction);
         }
@@ -483,7 +506,8 @@ public class CassandraConfiguration {
             flagsUpdateMessageMaxRetry, modSeqMaxRetry, uidMaxRetry, fetchNextPageInAdvanceRow,
             mailboxCountersReadRepairChanceOneHundred, mailboxCountersReadRepairChanceMax,
             blobPartSize, attachmentV2MigrationReadTimeout, messageAttachmentIdsReadTimeout,
-            consistencyLevelRegular, consistencyLevelLightweightTransaction, mailboxReadRepair);
+            consistencyLevelRegular, consistencyLevelLightweightTransaction, mailboxReadRepair,
+            messageReadStrongConsistency);
     }
 
     @Override
@@ -503,6 +527,7 @@ public class CassandraConfiguration {
             .add("blobPartSize", blobPartSize)
             .add("attachmentV2MigrationReadTimeout", attachmentV2MigrationReadTimeout)
             .add("messageAttachmentIdsReadTimeout", messageAttachmentIdsReadTimeout)
+            .add("messageReadStrongConsistency", messageReadStrongConsistency)
             .add("consistencyLevelRegular", consistencyLevelRegular)
             .add("consistencyLevelLightweightTransaction", consistencyLevelLightweightTransaction)
             .toString();
diff --git a/docs/modules/servers/pages/distributed/configure/cassandra.adoc b/docs/modules/servers/pages/distributed/configure/cassandra.adoc
index 3439948..019b14b 100644
--- a/docs/modules/servers/pages/distributed/configure/cassandra.adoc
+++ b/docs/modules/servers/pages/distributed/configure/cassandra.adoc
@@ -185,4 +185,10 @@ Controls the number of messages to be retrieved in parallel.
 | Optional. Defaults to 102400 (100KB).
 Controls the size of blob parts used to store messages in the Cassandra blobStore.
 
+| message.read.strong.consistency
+| Optional. Boolean, defaults to true. Disabling should be considered experimental.
+If enabled, regular consistency level is used for read transactions for message. This might result
+in stale reads as the system.paxos table will not be checked for latest updates. Better performance are expected
+by turning it off. Note that reads performed as part of write transactions are always performed with a strong
+consistency.
 |===
\ No newline at end of file
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
index 7412ba4..3122449 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
@@ -237,13 +237,13 @@ public class DeleteMessageListener implements MailboxListener.GroupMailboxListen
     }
 
     private Mono<Boolean> isReferenced(CassandraMessageId id) {
-        return imapUidDAO.retrieve(id, ALL_MAILBOXES)
+        return imapUidDAO.retrieve(id, ALL_MAILBOXES, CassandraMessageIdToImapUidDAO.ReadConsistency.STRONG)
             .hasElements()
             .map(negate());
     }
 
     private Mono<Boolean> isReferenced(CassandraMessageId id, CassandraId excludedId) {
-        return imapUidDAO.retrieve(id, ALL_MAILBOXES)
+        return imapUidDAO.retrieve(id, ALL_MAILBOXES, CassandraMessageIdToImapUidDAO.ReadConsistency.STRONG)
             .filter(metadata -> !metadata.getComposedMessageId().getMailboxId().equals(excludedId))
             .hasElements()
             .map(negate());
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index c9925b9..19fe49b 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -18,6 +18,8 @@
  ****************************************************************/
 package org.apache.james.mailbox.cassandra.mail;
 
+import static org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO.ReadConsistency.STRONG;
+import static org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO.ReadConsistency.WEAK;
 import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
 
 import java.time.Duration;
@@ -33,6 +35,7 @@ import org.apache.james.backends.cassandra.init.configuration.CassandraConfigura
 import org.apache.james.mailbox.MessageManager;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
 import org.apache.james.mailbox.cassandra.ids.CassandraMessageId;
+import org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO.ReadConsistency;
 import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.exception.MailboxNotFoundException;
 import org.apache.james.mailbox.model.ComposedMessageId;
@@ -108,7 +111,8 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
     @Override
     public Flux<MailboxMessage> findReactive(Collection<MessageId> messageIds, FetchType fetchType) {
         return Flux.fromIterable(messageIds)
-            .flatMap(messageId -> imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty()), cassandraConfiguration.getMessageReadChunkSize())
+            .flatMap(messageId -> imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty(), chooseReadConsistency()),
+                cassandraConfiguration.getMessageReadChunkSize())
             .flatMap(composedMessageId -> messageDAOV3.retrieveMessage(composedMessageId, fetchType)
                 .switchIfEmpty(messageDAO.retrieveMessage(composedMessageId, fetchType))
                 .map(messageRepresentation -> Pair.of(composedMessageId, messageRepresentation)), cassandraConfiguration.getMessageReadChunkSize())
@@ -119,7 +123,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
 
     @Override
     public Publisher<ComposedMessageIdWithMetaData> findMetadata(MessageId messageId) {
-        return imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty());
+        return imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty(), chooseReadConsistency());
     }
 
     private Flux<MailboxMessage> keepMessageIfMailboxExists(GroupedFlux<MailboxId, MailboxMessage> groupedFlux) {
@@ -136,13 +140,21 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
 
     @Override
     public List<MailboxId> findMailboxes(MessageId messageId) {
-        return imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty())
+        return imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.empty(), chooseReadConsistency())
             .map(ComposedMessageIdWithMetaData::getComposedMessageId)
             .map(ComposedMessageId::getMailboxId)
             .collectList()
             .block();
     }
 
+    public ReadConsistency chooseReadConsistency() {
+        if (cassandraConfiguration.isMessageReadStrongConsistency()) {
+            return STRONG;
+        } else {
+            return WEAK;
+        }
+    }
+
     @Override
     public void save(MailboxMessage mailboxMessage) throws MailboxException {
         CassandraId mailboxId = (CassandraId) mailboxMessage.getMailboxId();
@@ -208,7 +220,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
     }
 
     private Mono<Void> retrieveAndDeleteIndices(CassandraMessageId messageId, Optional<CassandraId> mailboxId) {
-        return imapUidDAO.retrieve(messageId, mailboxId)
+        return imapUidDAO.retrieve(messageId, mailboxId, STRONG)
             .flatMap(this::deleteIds, ReactorUtils.DEFAULT_CONCURRENCY)
             .then();
     }
@@ -270,7 +282,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
 
     private Mono<List<Pair<Flags, ComposedMessageIdWithMetaData>>> updateFlags(MailboxId mailboxId, MessageId messageId, Flags newState, MessageManager.FlagsUpdateMode updateMode) {
         CassandraId cassandraId = (CassandraId) mailboxId;
-        return imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.of(cassandraId))
+        return imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.of(cassandraId), STRONG)
             .flatMap(oldComposedId -> updateFlags(newState, updateMode, cassandraId, oldComposedId), ReactorUtils.DEFAULT_CONCURRENCY)
             .switchIfEmpty(Mono.error(MailboxDeleteDuringUpdateException::new))
             .collectList();
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java
index 4c6ee7c..2633fbf 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java
@@ -41,6 +41,7 @@ import static org.apache.james.mailbox.cassandra.table.MessageIdToImapUid.MOD_SE
 import static org.apache.james.mailbox.cassandra.table.MessageIdToImapUid.TABLE_NAME;
 
 import java.util.Optional;
+import java.util.function.Function;
 
 import javax.inject.Inject;
 import javax.mail.Flags;
@@ -62,12 +63,28 @@ import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
 import com.datastax.driver.core.Statement;
 import com.datastax.driver.core.querybuilder.QueryBuilder;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableSet;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class CassandraMessageIdToImapUidDAO {
+    public enum ReadConsistency {
+        STRONG(CassandraConsistenciesConfiguration::getLightweightTransaction),
+        WEAK(CassandraConsistenciesConfiguration::getRegular);
+
+        private final Function<CassandraConsistenciesConfiguration, ConsistencyLevel> consistencyLevelChoice;
+
+
+        ReadConsistency(Function<CassandraConsistenciesConfiguration, ConsistencyLevel> consistencyLevelChoice) {
+            this.consistencyLevelChoice = consistencyLevelChoice;
+        }
+
+        public ConsistencyLevel choose(CassandraConsistenciesConfiguration configuration) {
+            return consistencyLevelChoice.apply(configuration);
+        }
+    }
 
     private static final String MOD_SEQ_CONDITION = "modSeqCondition";
 
@@ -79,13 +96,13 @@ public class CassandraMessageIdToImapUidDAO {
     private final PreparedStatement selectAll;
     private final PreparedStatement select;
     private final PreparedStatement listStatement;
-    private final ConsistencyLevel consistencyLevel;
+    private final CassandraConsistenciesConfiguration consistenciesConfiguration;
 
     @Inject
     public CassandraMessageIdToImapUidDAO(Session session, CassandraConsistenciesConfiguration consistenciesConfiguration,
                                           CassandraMessageId.Factory messageIdFactory) {
         this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
-        this.consistencyLevel = consistenciesConfiguration.getLightweightTransaction();
+        this.consistenciesConfiguration = consistenciesConfiguration;
         this.messageIdFactory = messageIdFactory;
         this.delete = prepareDelete(session);
         this.insert = prepareInsert(session);
@@ -196,13 +213,18 @@ public class CassandraMessageIdToImapUidDAO {
                 .setLong(MOD_SEQ_CONDITION, oldModSeq.asLong()));
     }
 
-    public Flux<ComposedMessageIdWithMetaData> retrieve(CassandraMessageId messageId, Optional<CassandraId> mailboxId) {
+    public Flux<ComposedMessageIdWithMetaData> retrieve(CassandraMessageId messageId, Optional<CassandraId> mailboxId, ReadConsistency readConsistency) {
         return cassandraAsyncExecutor.executeRows(
                     selectStatement(messageId, mailboxId)
-                    .setConsistencyLevel(consistencyLevel))
+                    .setConsistencyLevel(readConsistency.choose(consistenciesConfiguration)))
                 .map(this::toComposedMessageIdWithMetadata);
     }
 
+    @VisibleForTesting
+    public Flux<ComposedMessageIdWithMetaData> retrieve(CassandraMessageId messageId, Optional<CassandraId> mailboxId) {
+        return retrieve(messageId, mailboxId, ReadConsistency.STRONG);
+    }
+
     public Flux<ComposedMessageIdWithMetaData> retrieveAllMessages() {
         return cassandraAsyncExecutor.executeRows(listStatement.bind())
             .map(row -> toComposedMessageIdWithMetadata(row));
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index ffc0f36..5ced7bf 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.mailbox.cassandra.mail;
 
+import static org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO.ReadConsistency.STRONG;
 import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
 
 import java.security.SecureRandom;
@@ -382,7 +383,8 @@ public class CassandraMessageMapper implements MessageMapper {
     private Mono<FlagsUpdateStageResult> retryUpdatesStage(CassandraId mailboxId, FlagsUpdateCalculator flagsUpdateCalculator, List<ComposedMessageId> failed) {
         if (!failed.isEmpty()) {
             Flux<ComposedMessageIdWithMetaData> toUpdate = Flux.fromIterable(failed)
-                .flatMap(ids -> imapUidDAO.retrieve((CassandraMessageId) ids.getMessageId(), Optional.of((CassandraId) ids.getMailboxId())), DEFAULT_CONCURRENCY);
+                .flatMap(ids -> imapUidDAO.retrieve((CassandraMessageId) ids.getMessageId(), Optional.of((CassandraId) ids.getMailboxId()), STRONG),
+                    DEFAULT_CONCURRENCY);
             return runUpdateStage(mailboxId, toUpdate, flagsUpdateCalculator);
         } else {
             return Mono.empty();
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/RecomputeMailboxCountersService.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/RecomputeMailboxCountersService.java
index 18716c5..aa88ef9 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/RecomputeMailboxCountersService.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/RecomputeMailboxCountersService.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.mailbox.cassandra.mail.task;
 
+import static org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO.ReadConsistency.STRONG;
+
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentLinkedDeque;
@@ -241,7 +243,7 @@ public class RecomputeMailboxCountersService {
         }
         CassandraMessageId messageId = (CassandraMessageId) message.getComposedMessageId().getMessageId();
 
-        return messageIdToImapUidDAO.retrieve(messageId, Optional.of(mailboxId))
+        return messageIdToImapUidDAO.retrieve(messageId, Optional.of(mailboxId), STRONG)
             .doOnNext(trustedMessage -> {
                 if (!trustedMessage.equals(message)) {
                     LOGGER.warn("Possible denormalization issue on {}. " +
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java
index 49936ee..facd5d6 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMessageInconsistenciesService.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.mailbox.cassandra.mail.task;
 
+import static org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO.ReadConsistency.STRONG;
 import static org.apache.james.util.ReactorUtils.publishIfPresent;
 
 import java.time.Duration;
@@ -456,7 +457,7 @@ public class SolveMessageInconsistenciesService {
     }
 
     private Mono<Inconsistency> detectOutdatedMessageIdEntry(CassandraId mailboxId, CassandraMessageId messageId, ComposedMessageIdWithMetaData messageIdRecord) {
-        return messageIdToImapUidDAO.retrieve(messageId, Optional.of(mailboxId))
+        return messageIdToImapUidDAO.retrieve(messageId, Optional.of(mailboxId), STRONG)
             .filter(Predicate.not(Predicate.isEqual(messageIdRecord)))
             .<Inconsistency>map(upToDateMessageFromImapUid -> new OutdatedMessageIdEntry(messageIdRecord, upToDateMessageFromImapUid))
             .next()
@@ -464,7 +465,7 @@ public class SolveMessageInconsistenciesService {
     }
 
     private Mono<Inconsistency> detectOrphanImapUidEntry(ComposedMessageIdWithMetaData messageFromImapUid, CassandraId mailboxId, CassandraMessageId messageId) {
-        return messageIdToImapUidDAO.retrieve(messageId, Optional.of(mailboxId))
+        return messageIdToImapUidDAO.retrieve(messageId, Optional.of(mailboxId), STRONG)
             .next()
             .<Inconsistency>map(OrphanImapUidEntry::new)
             .switchIfEmpty(Mono.just(NO_INCONSISTENCY));
@@ -481,7 +482,7 @@ public class SolveMessageInconsistenciesService {
     }
 
     private Mono<Inconsistency> detectInconsistencyInMessageId(ComposedMessageIdWithMetaData message) {
-        return messageIdToImapUidDAO.retrieve((CassandraMessageId) message.getComposedMessageId().getMessageId(), Optional.of((CassandraId) message.getComposedMessageId().getMailboxId()))
+        return messageIdToImapUidDAO.retrieve((CassandraMessageId) message.getComposedMessageId().getMessageId(), Optional.of((CassandraId) message.getComposedMessageId().getMailboxId()), STRONG)
             .map(uidRecord -> NO_INCONSISTENCY)
             .next()
             .switchIfEmpty(detectOrphanMessageIdEntry(message))
diff --git a/src/site/xdoc/server/config-cassandra.xml b/src/site/xdoc/server/config-cassandra.xml
index d779602..6271984 100644
--- a/src/site/xdoc/server/config-cassandra.xml
+++ b/src/site/xdoc/server/config-cassandra.xml
@@ -149,6 +149,13 @@
         <dt><strong>mailbox.blob.part.size</strong></dt>
         <dd>Optional. Defaults to 102400 (100KB).<br/> Controls the size of blob parts used to store messages.</dd>
 
+        <dt><strong>message.read.strong.consistency</strong></dt>
+        <dd>Optional. Boolean, defaults to true. Disabling should be considered experimental.
+            If enabled, regular consistency level is used for read transactions for message. This might result
+            in stale reads as the system.paxos table will not be checked for latest updates. Better performance are expected
+            by turning it off. Note that reads performed as part of write transactions are always performed with a strong
+            consistency.</dd>
+
         <dt><strong>Allows specifying the driver default consistency level.</strong></dt>
         <dt><strong>cassandra.consistency_level.regular</strong></dt>
         <dd>Optional. Defaults to QUORUM.<br/> <a href="https://docs.datastax.com/en/cassandra-oss/3.x/cassandra/dml/dmlConfigConsistency.html">QUORUM, LOCAL_QUORUM, or EACH_QUORUM</a>.</dd>


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org