You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/04/08 03:56:52 UTC

[james-project] 02/12: JAMES-3435 Configuration option: message.write.strong.consistency

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit d39460fcc063a4757e18663c9e2efe8673d96866
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sun Mar 28 14:27:47 2021 +0700

    JAMES-3435 Configuration option: message.write.strong.consistency
---
 .../init/configuration/CassandraConfiguration.java | 26 +++++++++++++--
 .../mail/CassandraMessageIdToImapUidDAO.java       | 38 ++++++++++++++--------
 2 files changed, 48 insertions(+), 16 deletions(-)

diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/configuration/CassandraConfiguration.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/configuration/CassandraConfiguration.java
index ad46dd9..57e60a4 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/configuration/CassandraConfiguration.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/configuration/CassandraConfiguration.java
@@ -74,6 +74,7 @@ public class CassandraConfiguration {
     private static final String ATTACHMENT_V2_MIGRATION_READ_TIMEOUT = "attachment.v2.migration.read.timeout";
     private static final String MESSAGE_ATTACHMENTID_READ_TIMEOUT = "message.attachmentids.read.timeout";
     private static final String MESSAGE_READ_STRONG_CONSISTENCY = "message.read.strong.consistency";
+    private static final String MESSAGE_WRITE_STRONG_CONSISTENCY = "message.write.strong.consistency.unsafe";
     private static final String CONSISTENCY_LEVEL_REGULAR = "cassandra.consistency_level.regular";
     private static final String CONSISTENCY_LEVEL_LIGHTWEIGHT_TRANSACTION = "cassandra.consistency_level.lightweight_transaction";
 
@@ -97,6 +98,7 @@ public class CassandraConfiguration {
         private Optional<Float> mailboxCountersReadRepairMax = Optional.empty();
         private Optional<Float> mailboxCountersReadRepairChanceOneHundred = Optional.empty();
         private Optional<Boolean> messageReadStrongConsistency = Optional.empty();
+        private Optional<Boolean> messageWriteStrongConsistency = Optional.empty();
 
         public Builder messageReadStrongConsistency(boolean value) {
             this.messageReadStrongConsistency = Optional.of(value);
@@ -108,6 +110,16 @@ public class CassandraConfiguration {
             return this;
         }
 
+        public Builder messageWriteStrongConsistency(boolean value) {
+            this.messageWriteStrongConsistency = Optional.of(value);
+            return this;
+        }
+
+        public Builder messageWriteStrongConsistency(Optional<Boolean> value) {
+            this.messageWriteStrongConsistency = value;
+            return this;
+        }
+
         public Builder messageReadChunkSize(int value) {
             Preconditions.checkArgument(value > 0, "messageReadChunkSize needs to be strictly positive");
             this.messageReadChunkSize = Optional.of(value);
@@ -314,7 +326,8 @@ public class CassandraConfiguration {
                 mailboxReadRepair.orElse(DEFAULT_MAILBOX_READ_REPAIR),
                 mailboxCountersReadRepairMax.orElse(DEFAULT_MAX_MAILBOX_COUNTERS_READ_REPAIR_CHANCE),
                 mailboxCountersReadRepairChanceOneHundred.orElse(DEFAULT_ONE_HUNDRED_MAILBOX_COUNTERS_READ_REPAIR_CHANCE),
-                messageReadStrongConsistency.orElse(DEFAULT_STRONG_CONSISTENCY));
+                messageReadStrongConsistency.orElse(DEFAULT_STRONG_CONSISTENCY),
+                messageWriteStrongConsistency.orElse(DEFAULT_STRONG_CONSISTENCY));
         }
     }
 
@@ -358,6 +371,8 @@ public class CassandraConfiguration {
                 propertiesConfiguration.getFloat(MAILBOX_ONE_HUNDRED_COUNTERS_READ_REPAIR, null)))
             .messageReadStrongConsistency(Optional.ofNullable(
                 propertiesConfiguration.getBoolean(MESSAGE_READ_STRONG_CONSISTENCY, null)))
+            .messageWriteStrongConsistency(Optional.ofNullable(
+                propertiesConfiguration.getBoolean(MESSAGE_WRITE_STRONG_CONSISTENCY, null)))
             .build();
     }
 
@@ -378,6 +393,7 @@ public class CassandraConfiguration {
     private final float mailboxCountersReadRepairChanceMax;
     private final float mailboxCountersReadRepairChanceOneHundred;
     private final boolean messageReadStrongConsistency;
+    private final boolean messageWriteStrongConsistency;
 
     @VisibleForTesting
     CassandraConfiguration(int aclMaxRetry, int messageReadChunkSize, int expungeChunkSize,
@@ -386,7 +402,8 @@ public class CassandraConfiguration {
                            int blobPartSize, final int attachmentV2MigrationReadTimeout, int messageAttachmentIdsReadTimeout,
                            String consistencyLevelRegular, String consistencyLevelLightweightTransaction,
                            float mailboxReadRepair, float mailboxCountersReadRepairChanceMax,
-                           float mailboxCountersReadRepairChanceOneHundred, boolean messageReadStrongConsistency) {
+                           float mailboxCountersReadRepairChanceOneHundred, boolean messageReadStrongConsistency,
+                           boolean messageWriteStrongConsistency) {
         this.aclMaxRetry = aclMaxRetry;
         this.messageReadChunkSize = messageReadChunkSize;
         this.expungeChunkSize = expungeChunkSize;
@@ -404,6 +421,11 @@ public class CassandraConfiguration {
         this.mailboxCountersReadRepairChanceMax = mailboxCountersReadRepairChanceMax;
         this.mailboxCountersReadRepairChanceOneHundred = mailboxCountersReadRepairChanceOneHundred;
         this.messageReadStrongConsistency = messageReadStrongConsistency;
+        this.messageWriteStrongConsistency = messageWriteStrongConsistency;
+    }
+
+    public boolean isMessageWriteStrongConsistency() {
+        return messageWriteStrongConsistency;
     }
 
     public boolean isMessageReadStrongConsistency() {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java
index 0bb92ea..e4bce1f 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java
@@ -57,6 +57,7 @@ import org.apache.james.mailbox.cassandra.ids.CassandraMessageId.Factory;
 import org.apache.james.mailbox.model.ComposedMessageId;
 import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
 
+import com.datastax.driver.core.BoundStatement;
 import com.datastax.driver.core.ConsistencyLevel;
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Row;
@@ -103,6 +104,7 @@ public class CassandraMessageIdToImapUidDAO {
                                           CassandraMessageId.Factory messageIdFactory) {
         this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
         this.consistenciesConfiguration = consistenciesConfiguration;
+        this.cassandraConfiguration = cassandraConfiguration;
         this.messageIdFactory = messageIdFactory;
         this.delete = prepareDelete(session);
         this.insert = prepareInsert(session);
@@ -197,20 +199,28 @@ public class CassandraMessageIdToImapUidDAO {
     public Mono<Boolean> updateMetadata(ComposedMessageIdWithMetaData composedMessageIdWithMetaData, ModSeq oldModSeq) {
         ComposedMessageId composedMessageId = composedMessageIdWithMetaData.getComposedMessageId();
         Flags flags = composedMessageIdWithMetaData.getFlags();
-        return cassandraAsyncExecutor.executeReturnApplied(update.bind()
-                .setLong(MOD_SEQ, composedMessageIdWithMetaData.getModSeq().asLong())
-                .setBool(ANSWERED, flags.contains(Flag.ANSWERED))
-                .setBool(DELETED, flags.contains(Flag.DELETED))
-                .setBool(DRAFT, flags.contains(Flag.DRAFT))
-                .setBool(FLAGGED, flags.contains(Flag.FLAGGED))
-                .setBool(RECENT, flags.contains(Flag.RECENT))
-                .setBool(SEEN, flags.contains(Flag.SEEN))
-                .setBool(USER, flags.contains(Flag.USER))
-                .setSet(USER_FLAGS, ImmutableSet.copyOf(flags.getUserFlags()))
-                .setUUID(MESSAGE_ID, ((CassandraMessageId) composedMessageId.getMessageId()).get())
-                .setUUID(MAILBOX_ID, ((CassandraId) composedMessageId.getMailboxId()).asUuid())
-                .setLong(IMAP_UID, composedMessageId.getUid().asLong())
-                .setLong(MOD_SEQ_CONDITION, oldModSeq.asLong()));
+        return cassandraAsyncExecutor.executeReturnApplied(updateBoundStatement(composedMessageIdWithMetaData, composedMessageId, flags, oldModSeq));
+    }
+
+    private BoundStatement updateBoundStatement(ComposedMessageIdWithMetaData composedMessageIdWithMetaData, ComposedMessageId composedMessageId, Flags flags,
+                                                ModSeq oldModSeq) {
+        final BoundStatement boundStatement = update.bind()
+            .setLong(MOD_SEQ, composedMessageIdWithMetaData.getModSeq().asLong())
+            .setBool(ANSWERED, flags.contains(Flag.ANSWERED))
+            .setBool(DELETED, flags.contains(Flag.DELETED))
+            .setBool(DRAFT, flags.contains(Flag.DRAFT))
+            .setBool(FLAGGED, flags.contains(Flag.FLAGGED))
+            .setBool(RECENT, flags.contains(Flag.RECENT))
+            .setBool(SEEN, flags.contains(Flag.SEEN))
+            .setBool(USER, flags.contains(Flag.USER))
+            .setSet(USER_FLAGS, ImmutableSet.copyOf(flags.getUserFlags()))
+            .setUUID(MESSAGE_ID, ((CassandraMessageId) composedMessageId.getMessageId()).get())
+            .setUUID(MAILBOX_ID, ((CassandraId) composedMessageId.getMailboxId()).asUuid())
+            .setLong(IMAP_UID, composedMessageId.getUid().asLong());
+        if (cassandraConfiguration.isMessageWriteStrongConsistency()) {
+            return boundStatement.setLong(MOD_SEQ_CONDITION, oldModSeq.asLong());
+        }
+        return boundStatement;
     }
 
     public Flux<ComposedMessageIdWithMetaData> retrieve(CassandraMessageId messageId, Optional<CassandraId> mailboxId, ConsistencyChoice readConsistencyChoice) {

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org