You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/04/08 03:56:54 UTC
[james-project] 04/12: JAMES-3435 Use writeConsistency to determine
read consistency needs upon writes.
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit c490eeaff0af77d65a35e90e1f6b4684b9e322ad
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sun Mar 28 14:44:20 2021 +0700
JAMES-3435 Use writeConsistency to determine read consistency needs upon writes.
---
.../CassandraMailboxSessionMapperFactory.java | 2 +-
.../mailbox/cassandra/DeleteMessageListener.java | 20 +++++++++++++++++---
.../cassandra/mail/CassandraMessageIdMapper.java | 11 +++++++++--
.../cassandra/mail/CassandraMessageMapper.java | 10 +++++++++-
4 files changed, 36 insertions(+), 7 deletions(-)
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
index 26a8a04..671b894 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
@@ -217,6 +217,6 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa
public DeleteMessageListener deleteMessageListener() {
return new DeleteMessageListener(imapUidDAO, messageIdDAO, messageDAO, messageDAOV3, attachmentDAOV2, ownerDAO,
attachmentMessageIdDAO, aclMapper, userMailboxRightsDAO, applicableFlagDAO, firstUnseenDAO, deletedMessageDAO,
- mailboxCounterDAO, mailboxRecentsDAO, blobStore);
+ mailboxCounterDAO, mailboxRecentsDAO, blobStore, cassandraConfiguration);
}
}
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
index ddfe04b..d48623b 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/DeleteMessageListener.java
@@ -19,6 +19,8 @@
package org.apache.james.mailbox.cassandra;
+import static org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO.ConsistencyChoice.STRONG;
+import static org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO.ConsistencyChoice.WEAK;
import static org.apache.james.util.FunctionalUtils.negate;
import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
@@ -27,6 +29,7 @@ import java.util.function.Predicate;
import javax.inject.Inject;
+import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
import org.apache.james.blob.api.BlobStore;
import org.apache.james.events.Event;
import org.apache.james.events.EventListener;
@@ -95,13 +98,16 @@ public class DeleteMessageListener implements EventListener.GroupEventListener {
private final CassandraMailboxCounterDAO counterDAO;
private final CassandraMailboxRecentsDAO recentsDAO;
private final BlobStore blobStore;
+ private final CassandraConfiguration cassandraConfiguration;
@Inject
public DeleteMessageListener(CassandraMessageIdToImapUidDAO imapUidDAO, CassandraMessageIdDAO messageIdDAO, CassandraMessageDAO messageDAO,
CassandraMessageDAOV3 messageDAOV3, CassandraAttachmentDAOV2 attachmentDAO, CassandraAttachmentOwnerDAO ownerDAO,
CassandraAttachmentMessageIdDAO attachmentMessageIdDAO, CassandraACLMapper aclMapper,
CassandraUserMailboxRightsDAO rightsDAO, CassandraApplicableFlagDAO applicableFlagDAO,
- CassandraFirstUnseenDAO firstUnseenDAO, CassandraDeletedMessageDAO deletedMessageDAO, CassandraMailboxCounterDAO counterDAO, CassandraMailboxRecentsDAO recentsDAO, BlobStore blobStore) {
+ CassandraFirstUnseenDAO firstUnseenDAO, CassandraDeletedMessageDAO deletedMessageDAO,
+ CassandraMailboxCounterDAO counterDAO, CassandraMailboxRecentsDAO recentsDAO, BlobStore blobStore,
+ CassandraConfiguration cassandraConfiguration) {
this.imapUidDAO = imapUidDAO;
this.messageIdDAO = messageIdDAO;
this.messageDAO = messageDAO;
@@ -117,6 +123,7 @@ public class DeleteMessageListener implements EventListener.GroupEventListener {
this.counterDAO = counterDAO;
this.recentsDAO = recentsDAO;
this.blobStore = blobStore;
+ this.cassandraConfiguration = cassandraConfiguration;
}
@Override
@@ -239,15 +246,22 @@ public class DeleteMessageListener implements EventListener.GroupEventListener {
}
private Mono<Boolean> isReferenced(CassandraMessageId id) {
- return imapUidDAO.retrieve(id, ALL_MAILBOXES, CassandraMessageIdToImapUidDAO.ConsistencyChoice.STRONG)
+ return imapUidDAO.retrieve(id, ALL_MAILBOXES, chooseReadConsistencyUponWrites())
.hasElements()
.map(negate());
}
private Mono<Boolean> isReferenced(CassandraMessageId id, CassandraId excludedId) {
- return imapUidDAO.retrieve(id, ALL_MAILBOXES, CassandraMessageIdToImapUidDAO.ConsistencyChoice.STRONG)
+ return imapUidDAO.retrieve(id, ALL_MAILBOXES, chooseReadConsistencyUponWrites())
.filter(metadata -> !metadata.getComposedMessageId().getMailboxId().equals(excludedId))
.hasElements()
.map(negate());
}
+
+ private CassandraMessageIdToImapUidDAO.ConsistencyChoice chooseReadConsistencyUponWrites() {
+ if (cassandraConfiguration.isMessageWriteStrongConsistency()) {
+ return STRONG;
+ }
+ return WEAK;
+ }
}
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
index 9cb0d5e..1b5f58e 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdMapper.java
@@ -155,6 +155,13 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
}
}
+ private CassandraMessageIdToImapUidDAO.ConsistencyChoice chooseReadConsistencyUponWrites() {
+ if (cassandraConfiguration.isMessageWriteStrongConsistency()) {
+ return STRONG;
+ }
+ return WEAK;
+ }
+
@Override
public void save(MailboxMessage mailboxMessage) throws MailboxException {
CassandraId mailboxId = (CassandraId) mailboxMessage.getMailboxId();
@@ -220,7 +227,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
}
private Mono<Void> retrieveAndDeleteIndices(CassandraMessageId messageId, Optional<CassandraId> mailboxId) {
- return imapUidDAO.retrieve(messageId, mailboxId, STRONG)
+ return imapUidDAO.retrieve(messageId, mailboxId, chooseReadConsistencyUponWrites())
.flatMap(this::deleteIds, ReactorUtils.DEFAULT_CONCURRENCY)
.then();
}
@@ -283,7 +290,7 @@ public class CassandraMessageIdMapper implements MessageIdMapper {
private Mono<List<Pair<Flags, ComposedMessageIdWithMetaData>>> updateFlags(MailboxId mailboxId, MessageId messageId, Flags newState, MessageManager.FlagsUpdateMode updateMode) {
CassandraId cassandraId = (CassandraId) mailboxId;
- return imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.of(cassandraId), STRONG)
+ return imapUidDAO.retrieve((CassandraMessageId) messageId, Optional.of(cassandraId), chooseReadConsistencyUponWrites())
.flatMap(oldComposedId -> updateFlags(newState, updateMode, cassandraId, oldComposedId), ReactorUtils.DEFAULT_CONCURRENCY)
.switchIfEmpty(Mono.error(MailboxDeleteDuringUpdateException::new))
.collectList();
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
index 68a7e72..faa3464 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageMapper.java
@@ -20,6 +20,7 @@
package org.apache.james.mailbox.cassandra.mail;
import static org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO.ConsistencyChoice.STRONG;
+import static org.apache.james.mailbox.cassandra.mail.CassandraMessageIdToImapUidDAO.ConsistencyChoice.WEAK;
import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
import java.security.SecureRandom;
@@ -409,7 +410,7 @@ public class CassandraMessageMapper implements MessageMapper {
private Mono<FlagsUpdateStageResult> retryUpdatesStage(CassandraId mailboxId, FlagsUpdateCalculator flagsUpdateCalculator, List<ComposedMessageId> failed) {
if (!failed.isEmpty()) {
Flux<ComposedMessageIdWithMetaData> toUpdate = Flux.fromIterable(failed)
- .flatMap(ids -> imapUidDAO.retrieve((CassandraMessageId) ids.getMessageId(), Optional.of((CassandraId) ids.getMailboxId()), STRONG),
+ .flatMap(ids -> imapUidDAO.retrieve((CassandraMessageId) ids.getMessageId(), Optional.of((CassandraId) ids.getMailboxId()), chooseReadConsistencyUponWrites()),
DEFAULT_CONCURRENCY);
return runUpdateStage(mailboxId, toUpdate, flagsUpdateCalculator);
} else {
@@ -417,6 +418,13 @@ public class CassandraMessageMapper implements MessageMapper {
}
}
+ private CassandraMessageIdToImapUidDAO.ConsistencyChoice chooseReadConsistencyUponWrites() {
+ if (cassandraConfiguration.isMessageWriteStrongConsistency()) {
+ return STRONG;
+ }
+ return WEAK;
+ }
+
private Mono<FlagsUpdateStageResult> runUpdateStage(CassandraId mailboxId, Flux<ComposedMessageIdWithMetaData> toBeUpdated, FlagsUpdateCalculator flagsUpdateCalculator) {
return computeNewModSeq(mailboxId)
.flatMapMany(newModSeq -> toBeUpdated
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org