You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/04/08 03:56:54 UTC

[james-project] 04/12: JAMES-3435 Use writeConsistency to determine read consistency needs upon writes.

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit c490eeaff0af77d65a35e90e1f6b4684b9e322ad
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sun Mar 28 14:44:20 2021 +0700

    JAMES-3435 Use writeConsistency to determine read consistency needs upon writes.
---
 .../CassandraMailboxSessionMapperFactory.java        |  2 +-
 .../mailbox/cassandra/DeleteMessageListener.java     | 20 +++++++++++++++++---
 .../cassandra/mail/CassandraMessageIdMapper.java     | 11 +++++++++--
 .../cassandra/mail/CassandraMessageMapper.java       | 10 +++++++++-
 4 files changed, 36 insertions(+), 7 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
index 26a8a04..671b894 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
@@ -217,6 +217,6 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa
     public DeleteMessageListener deleteMessageListener() {
         return new DeleteMessageListener(imapUidDAO, messageIdDAO, messageDAO, messageDAOV3, attachmentDAOV2, ownerDAO,
             attachmentMessageIdDAO, aclMapper, userMailboxRightsDAO, applicableFlagDAO, firstUnseenDAO, deletedMessageDAO,
-            mailboxCounterDAO, mailboxRecentsDAO, blobStore);
+            mailboxCounterDAO, mailboxRecentsDAO, blobStore, cassandraConfiguration);
     }
 }
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
index ddfe04b..d48623b 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
@@ -19,6 +19,8 @@
 
 package org.apache.james.mailbox.cassandra;
 
+import static org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO.ConsistencyChoice.STRONG;
+import static org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO.ConsistencyChoice.WEAK;
 import static org.apache.james.util.FunctionalUtils.negate;
 import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
 
@@ -27,6 +29,7 @@ import java.util.function.Predicate;
 
 import javax.inject.Inject;
 
+import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
 import org.apache.james.blob.api.BlobStore;
 import org.apache.james.events.Event;
 import org.apache.james.events.EventListener;
@@ -95,13 +98,16 @@ public class DeleteMessageListener implements EventListener.GroupEventListener {
     private final CassandraMailboxCounterDAO counterDAO;
     private final CassandraMailboxRecentsDAO recentsDAO;
     private final BlobStore blobStore;
+    private final CassandraConfiguration cassandraConfiguration;
 
     @Inject
     public DeleteMessageListener(CassandraMessageIdToImapUidDAO imapUidDAO, CassandraMessageIdDAO messageIdDAO, CassandraMessageDAO messageDAO,
                                  CassandraMessageDAOV3 messageDAOV3, CassandraAttachmentDAOV2 attachmentDAO, CassandraAttachmentOwnerDAO ownerDAO,
                                  CassandraAttachmentMessageIdDAO attachmentMessageIdDAO, CassandraACLMapper aclMapper,
                                  CassandraUserMailboxRightsDAO rightsDAO, CassandraApplicableFlagDAO applicableFlagDAO,
-                                 CassandraFirstUnseenDAO firstUnseenDAO, CassandraDeletedMessageDAO deletedMessageDAO, CassandraMailboxCounterDAO counterDAO, CassandraMailboxRecentsDAO recentsDAO, BlobStore blobStore) {
+                                 CassandraFirstUnseenDAO firstUnseenDAO, CassandraDeletedMessageDAO deletedMessageDAO,
+                                 CassandraMailboxCounterDAO counterDAO, CassandraMailboxRecentsDAO recentsDAO, BlobStore blobStore,
+                                 CassandraConfiguration cassandraConfiguration) {
         this.imapUidDAO = imapUidDAO;
         this.messageIdDAO = messageIdDAO;
         this.messageDAO = messageDAO;
@@ -117,6 +123,7 @@ public class DeleteMessageListener implements EventListener.GroupEventListener {
         this.counterDAO = counterDAO;
         this.recentsDAO = recentsDAO;
         this.blobStore = blobStore;
+        this.cassandraConfiguration = cassandraConfiguration;
     }
 
     @Override
@@ -239,15 +246,22 @@ public class DeleteMessageListener implements EventListener.GroupEventListener {
     }
 
     private Mono<Boolean> isReferenced(CassandraMessageId id) {
-        return imapUidDAO.retrieve(id, ALL_MAILBOXES, CassandraMessageIdToImapUidDAO.ConsistencyChoice.STRONG)
+        return imapUidDAO.retrieve(id, ALL_MAILBOXES, chooseReadConsistencyUponWrites())
             .hasElements()
             .map(negate());
     }
 
     private Mono<Boolean> isReferenced(CassandraMessageId id, CassandraId excludedId) {
-        return imapUidDAO.retrieve(id, ALL_MAILBOXES, CassandraMessageIdToImapUidDAO.ConsistencyChoice.STRONG)
+        return imapUidDAO.retrieve(id, ALL_MAILBOXES, chooseReadConsistencyUponWrites())
             .filter(metadata -> !metadata.getComposedMessageId().getMailboxId().equals(excludedId))
             .hasElements()
             .map(negate());
     }
+
+    private CassandraMessageIdToImapUidDAO.ConsistencyChoice chooseReadConsistencyUponWrites() {
+        if (cassandraConfiguration.isMessageWriteStrongConsistency()) {
+            return STRONG;
+        }
+        return WEAK;
+    }
 }
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index 9cb0d5e..1b5f58e 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -155,6 +155,13 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
         }
     }
 
+    private CassandraMessageIdToImapUidDAO.ConsistencyChoice chooseReadConsistencyUponWrites() {
+        if (cassandraConfiguration.isMessageWriteStrongConsistency()) {
+            return STRONG;
+        }
+        return WEAK;
+    }
+
     @Override
     public void save(MailboxMessage mailboxMessage) throws MailboxException {
         CassandraId mailboxId = (CassandraId) mailboxMessage.getMailboxId();
@@ -220,7 +227,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
     }
 
     private Mono<Void> retrieveAndDeleteIndices(CassandraMessageId messageId, Optional<CassandraId> mailboxId) {
-        return imapUidDAO.retrieve(messageId, mailboxId, STRONG)
+        return imapUidDAO.retrieve(messageId, mailboxId, chooseReadConsistencyUponWrites())
             .flatMap(this::deleteIds, ReactorUtils.DEFAULT_CONCURRENCY)
             .then();
     }
@@ -283,7 +290,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
 
     private Mono<List<Pair<Flags, ComposedMessageIdWithMetaData>>> updateFlags(MailboxId mailboxId, MessageId messageId, Flags newState, MessageManager.FlagsUpdateMode updateMode) {
         CassandraId cassandraId = (CassandraId) mailboxId;
-        return imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.of(cassandraId), STRONG)
+        return imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.of(cassandraId), chooseReadConsistencyUponWrites())
             .flatMap(oldComposedId -> updateFlags(newState, updateMode, cassandraId, oldComposedId), ReactorUtils.DEFAULT_CONCURRENCY)
             .switchIfEmpty(Mono.error(MailboxDeleteDuringUpdateException::new))
             .collectList();
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 68a7e72..faa3464 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -20,6 +20,7 @@
 package org.apache.james.mailbox.cassandra.mail;
 
 import static org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO.ConsistencyChoice.STRONG;
+import static org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO.ConsistencyChoice.WEAK;
 import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
 
 import java.security.SecureRandom;
@@ -409,7 +410,7 @@ public class CassandraMessageMapper implements MessageMapper {
     private Mono<FlagsUpdateStageResult> retryUpdatesStage(CassandraId mailboxId, FlagsUpdateCalculator flagsUpdateCalculator, List<ComposedMessageId> failed) {
         if (!failed.isEmpty()) {
             Flux<ComposedMessageIdWithMetaData> toUpdate = Flux.fromIterable(failed)
-                .flatMap(ids -> imapUidDAO.retrieve((CassandraMessageId) ids.getMessageId(), Optional.of((CassandraId) ids.getMailboxId()), STRONG),
+                .flatMap(ids -> imapUidDAO.retrieve((CassandraMessageId) ids.getMessageId(), Optional.of((CassandraId) ids.getMailboxId()), chooseReadConsistencyUponWrites()),
                     DEFAULT_CONCURRENCY);
             return runUpdateStage(mailboxId, toUpdate, flagsUpdateCalculator);
         } else {
@@ -417,6 +418,13 @@ public class CassandraMessageMapper implements MessageMapper {
         }
     }
 
+    private CassandraMessageIdToImapUidDAO.ConsistencyChoice chooseReadConsistencyUponWrites() {
+        if (cassandraConfiguration.isMessageWriteStrongConsistency()) {
+            return STRONG;
+        }
+        return WEAK;
+    }
+
     private Mono<FlagsUpdateStageResult> runUpdateStage(CassandraId mailboxId, Flux<ComposedMessageIdWithMetaData> toBeUpdated, FlagsUpdateCalculator flagsUpdateCalculator) {
         return computeNewModSeq(mailboxId)
             .flatMapMany(newModSeq -> toBeUpdated

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org