You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/04/08 03:56:52 UTC
[james-project] 02/12: JAMES-3435 Configuration option:
message.write.strong.consistency
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit d39460fcc063a4757e18663c9e2efe8673d96866
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sun Mar 28 14:27:47 2021 +0700
JAMES-3435 Configuration option: message.write.strong.consistency
---
.../init/configuration/CassandraConfiguration.java | 26 +++++++++++++--
.../mail/CassandraMessageIdToImapUidDAO.java | 38 ++++++++++++++--------
2 files changed, 48 insertions(+), 16 deletions(-)
diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/configuration/CassandraConfiguration.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/configuration/CassandraConfiguration.java
index ad46dd9..57e60a4 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/configuration/CassandraConfiguration.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/configuration/CassandraConfiguration.java
@@ -74,6 +74,7 @@ public class CassandraConfiguration {
private static final String ATTACHMENT_V2_MIGRATION_READ_TIMEOUT = "attachment.v2.migration.read.timeout";
private static final String MESSAGE_ATTACHMENTID_READ_TIMEOUT = "message.attachmentids.read.timeout";
private static final String MESSAGE_READ_STRONG_CONSISTENCY = "message.read.strong.consistency";
+ private static final String MESSAGE_WRITE_STRONG_CONSISTENCY = "message.write.strong.consistency.unsafe";
private static final String CONSISTENCY_LEVEL_REGULAR = "cassandra.consistency_level.regular";
private static final String CONSISTENCY_LEVEL_LIGHTWEIGHT_TRANSACTION = "cassandra.consistency_level.lightweight_transaction";
@@ -97,6 +98,7 @@ public class CassandraConfiguration {
private Optional<Float> mailboxCountersReadRepairMax = Optional.empty();
private Optional<Float> mailboxCountersReadRepairChanceOneHundred = Optional.empty();
private Optional<Boolean> messageReadStrongConsistency = Optional.empty();
+ private Optional<Boolean> messageWriteStrongConsistency = Optional.empty();
public Builder messageReadStrongConsistency(boolean value) {
this.messageReadStrongConsistency = Optional.of(value);
@@ -108,6 +110,16 @@ public class CassandraConfiguration {
return this;
}
+ public Builder messageWriteStrongConsistency(boolean value) {
+ this.messageWriteStrongConsistency = Optional.of(value);
+ return this;
+ }
+
+ public Builder messageWriteStrongConsistency(Optional<Boolean> value) {
+ this.messageWriteStrongConsistency = value;
+ return this;
+ }
+
public Builder messageReadChunkSize(int value) {
Preconditions.checkArgument(value > 0, "messageReadChunkSize needs to be strictly positive");
this.messageReadChunkSize = Optional.of(value);
@@ -314,7 +326,8 @@ public class CassandraConfiguration {
mailboxReadRepair.orElse(DEFAULT_MAILBOX_READ_REPAIR),
mailboxCountersReadRepairMax.orElse(DEFAULT_MAX_MAILBOX_COUNTERS_READ_REPAIR_CHANCE),
mailboxCountersReadRepairChanceOneHundred.orElse(DEFAULT_ONE_HUNDRED_MAILBOX_COUNTERS_READ_REPAIR_CHANCE),
- messageReadStrongConsistency.orElse(DEFAULT_STRONG_CONSISTENCY));
+ messageReadStrongConsistency.orElse(DEFAULT_STRONG_CONSISTENCY),
+ messageWriteStrongConsistency.orElse(DEFAULT_STRONG_CONSISTENCY));
}
}
@@ -358,6 +371,8 @@ public class CassandraConfiguration {
propertiesConfiguration.getFloat(MAILBOX_ONE_HUNDRED_COUNTERS_READ_REPAIR, null)))
.messageReadStrongConsistency(Optional.ofNullable(
propertiesConfiguration.getBoolean(MESSAGE_READ_STRONG_CONSISTENCY, null)))
+ .messageWriteStrongConsistency(Optional.ofNullable(
+ propertiesConfiguration.getBoolean(MESSAGE_WRITE_STRONG_CONSISTENCY, null)))
.build();
}
@@ -378,6 +393,7 @@ public class CassandraConfiguration {
private final float mailboxCountersReadRepairChanceMax;
private final float mailboxCountersReadRepairChanceOneHundred;
private final boolean messageReadStrongConsistency;
+ private final boolean messageWriteStrongConsistency;
@VisibleForTesting
CassandraConfiguration(int aclMaxRetry, int messageReadChunkSize, int expungeChunkSize,
@@ -386,7 +402,8 @@ public class CassandraConfiguration {
int blobPartSize, final int attachmentV2MigrationReadTimeout, int messageAttachmentIdsReadTimeout,
String consistencyLevelRegular, String consistencyLevelLightweightTransaction,
float mailboxReadRepair, float mailboxCountersReadRepairChanceMax,
- float mailboxCountersReadRepairChanceOneHundred, boolean messageReadStrongConsistency) {
+ float mailboxCountersReadRepairChanceOneHundred, boolean messageReadStrongConsistency,
+ boolean messageWriteStrongConsistency) {
this.aclMaxRetry = aclMaxRetry;
this.messageReadChunkSize = messageReadChunkSize;
this.expungeChunkSize = expungeChunkSize;
@@ -404,6 +421,11 @@ public class CassandraConfiguration {
this.mailboxCountersReadRepairChanceMax = mailboxCountersReadRepairChanceMax;
this.mailboxCountersReadRepairChanceOneHundred = mailboxCountersReadRepairChanceOneHundred;
this.messageReadStrongConsistency = messageReadStrongConsistency;
+ this.messageWriteStrongConsistency = messageWriteStrongConsistency;
+ }
+
+ public boolean isMessageWriteStrongConsistency() {
+ return messageWriteStrongConsistency;
}
public boolean isMessageReadStrongConsistency() {
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java
index 0bb92ea..e4bce1f 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMessageIdToImapUidDAO.java
@@ -57,6 +57,7 @@ import org.apache.james.mailbox.cassandra.ids.CassandraMessageId.Factory;
import org.apache.james.mailbox.model.ComposedMessageId;
import org.apache.james.mailbox.model.ComposedMessageIdWithMetaData;
+import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.ConsistencyLevel;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Row;
@@ -103,6 +104,7 @@ public class CassandraMessageIdToImapUidDAO {
CassandraMessageId.Factory messageIdFactory) {
this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
this.consistenciesConfiguration = consistenciesConfiguration;
+ this.cassandraConfiguration = cassandraConfiguration;
this.messageIdFactory = messageIdFactory;
this.delete = prepareDelete(session);
this.insert = prepareInsert(session);
@@ -197,20 +199,28 @@ public class CassandraMessageIdToImapUidDAO {
public Mono<Boolean> updateMetadata(ComposedMessageIdWithMetaData composedMessageIdWithMetaData, ModSeq oldModSeq) {
ComposedMessageId composedMessageId = composedMessageIdWithMetaData.getComposedMessageId();
Flags flags = composedMessageIdWithMetaData.getFlags();
- return cassandraAsyncExecutor.executeReturnApplied(update.bind()
- .setLong(MOD_SEQ, composedMessageIdWithMetaData.getModSeq().asLong())
- .setBool(ANSWERED, flags.contains(Flag.ANSWERED))
- .setBool(DELETED, flags.contains(Flag.DELETED))
- .setBool(DRAFT, flags.contains(Flag.DRAFT))
- .setBool(FLAGGED, flags.contains(Flag.FLAGGED))
- .setBool(RECENT, flags.contains(Flag.RECENT))
- .setBool(SEEN, flags.contains(Flag.SEEN))
- .setBool(USER, flags.contains(Flag.USER))
- .setSet(USER_FLAGS, ImmutableSet.copyOf(flags.getUserFlags()))
- .setUUID(MESSAGE_ID, ((CassandraMessageId) composedMessageId.getMessageId()).get())
- .setUUID(MAILBOX_ID, ((CassandraId) composedMessageId.getMailboxId()).asUuid())
- .setLong(IMAP_UID, composedMessageId.getUid().asLong())
- .setLong(MOD_SEQ_CONDITION, oldModSeq.asLong()));
+ return cassandraAsyncExecutor.executeReturnApplied(updateBoundStatement(composedMessageIdWithMetaData, composedMessageId, flags, oldModSeq));
+ }
+
+ private BoundStatement updateBoundStatement(ComposedMessageIdWithMetaData composedMessageIdWithMetaData, ComposedMessageId composedMessageId, Flags flags,
+ ModSeq oldModSeq) {
+ final BoundStatement boundStatement = update.bind()
+ .setLong(MOD_SEQ, composedMessageIdWithMetaData.getModSeq().asLong())
+ .setBool(ANSWERED, flags.contains(Flag.ANSWERED))
+ .setBool(DELETED, flags.contains(Flag.DELETED))
+ .setBool(DRAFT, flags.contains(Flag.DRAFT))
+ .setBool(FLAGGED, flags.contains(Flag.FLAGGED))
+ .setBool(RECENT, flags.contains(Flag.RECENT))
+ .setBool(SEEN, flags.contains(Flag.SEEN))
+ .setBool(USER, flags.contains(Flag.USER))
+ .setSet(USER_FLAGS, ImmutableSet.copyOf(flags.getUserFlags()))
+ .setUUID(MESSAGE_ID, ((CassandraMessageId) composedMessageId.getMessageId()).get())
+ .setUUID(MAILBOX_ID, ((CassandraId) composedMessageId.getMailboxId()).asUuid())
+ .setLong(IMAP_UID, composedMessageId.getUid().asLong());
+ if (cassandraConfiguration.isMessageWriteStrongConsistency()) {
+ return boundStatement.setLong(MOD_SEQ_CONDITION, oldModSeq.asLong());
+ }
+ return boundStatement;
}
public Flux<ComposedMessageIdWithMetaData> retrieve(CassandraMessageId messageId, Optional<CassandraId> mailboxId, ConsistencyChoice readConsistencyChoice) {
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org