You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2020/10/14 02:31:46 UTC
[james-project] 20/22: JAMES-3407 Configuration for read repairs
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 5bfa119feda1dece37911650c1e62cd7843d1ae8
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu Oct 8 16:46:22 2020 +0700
JAMES-3407 Configuration for read repairs
---
.../init/configuration/CassandraConfiguration.java | 32 ++++++++++++++++++++--
.../CassandraMailboxSessionMapperFactory.java | 2 +-
.../cassandra/mail/CassandraMailboxMapper.java | 9 ++++--
.../cassandra/mail/CassandraMailboxMapperTest.java | 3 +-
.../mail/migration/MailboxPathV2MigrationTest.java | 3 +-
5 files changed, 40 insertions(+), 9 deletions(-)
diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/configuration/CassandraConfiguration.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/configuration/CassandraConfiguration.java
index 42bbce0..1acf52d 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/configuration/CassandraConfiguration.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/configuration/CassandraConfiguration.java
@@ -39,6 +39,7 @@ public class CassandraConfiguration {
private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(CassandraConfiguration.class);
public static final int DEFAULT_MESSAGE_CHUNK_SIZE_ON_READ = 100;
+ public static final float DEFAULT_MAILBOX_READ_REPAIR = 0.1f;
public static final int DEFAULT_EXPUNGE_BATCH_SIZE = 50;
public static final int DEFAULT_UPDATE_FLAGS_BATCH_SIZE = 20;
public static final int DEFAULT_FLAGS_UPDATE_MESSAGE_MAX_RETRY = 1000;
@@ -55,6 +56,7 @@ public class CassandraConfiguration {
public static final List<String> VALID_CONSISTENCY_LEVEL_REGULAR = ImmutableList.of("QUORUM", "LOCAL_QUORUM", "EACH_QUORUM");
public static final List<String> VALID_CONSISTENCY_LEVEL_LIGHTWEIGHT_TRANSACTION = ImmutableList.of("SERIAL", "LOCAL_SERIAL");
+ private static final String MAILBOX_READ_REPAIR = "mailbox.read.repair.chance";
private static final String MAILBOX_MAX_RETRY_ACL = "mailbox.max.retry.acl";
private static final String MAILBOX_MAX_RETRY_MODSEQ = "mailbox.max.retry.modseq";
private static final String MAILBOX_MAX_RETRY_UID = "mailbox.max.retry.uid";
@@ -85,6 +87,7 @@ public class CassandraConfiguration {
private Optional<Integer> messageAttachmentIdsReadTimeout = Optional.empty();
private Optional<String> consistencyLevelRegular = Optional.empty();
private Optional<String> consistencyLevelLightweightTransaction = Optional.empty();
+ private Optional<Float> mailboxReadRepair = Optional.empty();
public Builder messageReadChunkSize(int value) {
Preconditions.checkArgument(value > 0, "messageReadChunkSize needs to be strictly positive");
@@ -152,6 +155,13 @@ public class CassandraConfiguration {
return this;
}
+ public Builder mailboxReadRepair(float value) {
+ Preconditions.checkArgument(value >= 0, "mailboxReadRepair needs to be positive");
+ Preconditions.checkArgument(value <= 1, "mailboxReadRepair needs to be less or equal to 1");
+ this.mailboxReadRepair = Optional.of(value);
+ return this;
+ }
+
public Builder messageReadChunkSize(Optional<Integer> value) {
value.ifPresent(this::messageReadChunkSize);
return this;
@@ -207,6 +217,11 @@ public class CassandraConfiguration {
return this;
}
+ public Builder mailboxReadRepair(Optional<Float> value) {
+ value.ifPresent(this::mailboxReadRepair);
+ return this;
+ }
+
public Builder consistencyLevelRegular(String value) {
Preconditions.checkArgument(VALID_CONSISTENCY_LEVEL_REGULAR.contains(value),
"consistencyLevelRegular needs to be one of the following: " + String.join(", ", VALID_CONSISTENCY_LEVEL_REGULAR));
@@ -252,7 +267,8 @@ public class CassandraConfiguration {
attachmentV2MigrationReadTimeout.orElse(DEFAULT_ATTACHMENT_V2_MIGRATION_READ_TIMEOUT),
messageAttachmentIdsReadTimeout.orElse(DEFAULT_MESSAGE_ATTACHMENT_ID_MIGRATION_READ_TIMEOUT),
consistencyLevelRegular,
- consistencyLevelLightweightTransaction);
+ consistencyLevelLightweightTransaction,
+ mailboxReadRepair.orElse(DEFAULT_MAILBOX_READ_REPAIR));
}
}
@@ -288,6 +304,8 @@ public class CassandraConfiguration {
propertiesConfiguration.getString(CONSISTENCY_LEVEL_REGULAR)))
.consistencyLevelLightweightTransaction(Optional.ofNullable(
propertiesConfiguration.getString(CONSISTENCY_LEVEL_LIGHTWEIGHT_TRANSACTION)))
+ .mailboxReadRepair(Optional.ofNullable(
+ propertiesConfiguration.getFloat(MAILBOX_READ_REPAIR, null)))
.build();
}
@@ -304,13 +322,14 @@ public class CassandraConfiguration {
private final int messageAttachmentIdsReadTimeout;
private final String consistencyLevelRegular;
private final String consistencyLevelLightweightTransaction;
+ private final float mailboxReadRepair;
@VisibleForTesting
CassandraConfiguration(int aclMaxRetry, int messageReadChunkSize, int expungeChunkSize,
int flagsUpdateMessageIdMaxRetry, int flagsUpdateMessageMaxRetry,
int modSeqMaxRetry, int uidMaxRetry, int fetchNextPageInAdvanceRow,
int blobPartSize, final int attachmentV2MigrationReadTimeout, int messageAttachmentIdsReadTimeout,
- String consistencyLevelRegular, String consistencyLevelLightweightTransaction) {
+ String consistencyLevelRegular, String consistencyLevelLightweightTransaction, float mailboxReadRepair) {
this.aclMaxRetry = aclMaxRetry;
this.messageReadChunkSize = messageReadChunkSize;
this.expungeChunkSize = expungeChunkSize;
@@ -324,6 +343,11 @@ public class CassandraConfiguration {
this.messageAttachmentIdsReadTimeout = messageAttachmentIdsReadTimeout;
this.consistencyLevelRegular = consistencyLevelRegular;
this.consistencyLevelLightweightTransaction = consistencyLevelLightweightTransaction;
+ this.mailboxReadRepair = mailboxReadRepair;
+ }
+
+ public float getMailboxReadRepair() {
+ return mailboxReadRepair;
}
public int getBlobPartSize() {
@@ -390,6 +414,7 @@ public class CassandraConfiguration {
&& Objects.equals(this.flagsUpdateMessageMaxRetry, that.flagsUpdateMessageMaxRetry)
&& Objects.equals(this.modSeqMaxRetry, that.modSeqMaxRetry)
&& Objects.equals(this.uidMaxRetry, that.uidMaxRetry)
+ && Objects.equals(this.mailboxReadRepair, that.mailboxReadRepair)
&& Objects.equals(this.fetchNextPageInAdvanceRow, that.fetchNextPageInAdvanceRow)
&& Objects.equals(this.blobPartSize, that.blobPartSize)
&& Objects.equals(this.attachmentV2MigrationReadTimeout, that.attachmentV2MigrationReadTimeout)
@@ -405,7 +430,7 @@ public class CassandraConfiguration {
return Objects.hash(aclMaxRetry, messageReadChunkSize, expungeChunkSize, flagsUpdateMessageIdMaxRetry,
flagsUpdateMessageMaxRetry, modSeqMaxRetry, uidMaxRetry, fetchNextPageInAdvanceRow,
blobPartSize, attachmentV2MigrationReadTimeout, messageAttachmentIdsReadTimeout,
- consistencyLevelRegular, consistencyLevelLightweightTransaction);
+ consistencyLevelRegular, consistencyLevelLightweightTransaction, mailboxReadRepair);
}
@Override
@@ -418,6 +443,7 @@ public class CassandraConfiguration {
.add("flagsUpdateMessageMaxRetry", flagsUpdateMessageMaxRetry)
.add("modSeqMaxRetry", modSeqMaxRetry)
.add("fetchNextPageInAdvanceRow", fetchNextPageInAdvanceRow)
+ .add("mailboxReadRepair", mailboxReadRepair)
.add("uidMaxRetry", uidMaxRetry)
.add("blobPartSize", blobPartSize)
.add("attachmentV2MigrationReadTimeout", attachmentV2MigrationReadTimeout)
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
index 9767519..3cd91fd 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
@@ -168,7 +168,7 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa
@Override
public MailboxMapper createMailboxMapper(MailboxSession mailboxSession) {
- return new CassandraMailboxMapper(mailboxDAO, mailboxPathDAO, mailboxPathV2DAO, mailboxPathV3DAO, userMailboxRightsDAO, aclMapper, versionManager);
+ return new CassandraMailboxMapper(mailboxDAO, mailboxPathDAO, mailboxPathV2DAO, mailboxPathV3DAO, userMailboxRightsDAO, aclMapper, versionManager, cassandraConfiguration);
}
@Override
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
index e6d868d..e918b8e 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
@@ -25,6 +25,7 @@ import java.time.Duration;
import javax.inject.Inject;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionManager;
import org.apache.james.backends.cassandra.versions.SchemaVersion;
import org.apache.james.core.Username;
@@ -60,7 +61,6 @@ public class CassandraMailboxMapper implements MailboxMapper {
private static final Duration MIN_RETRY_BACKOFF = Duration.ofMillis(10);
private static final Duration MAX_RETRY_BACKOFF = Duration.ofMillis(1000);
private static final SchemaVersion MAILBOX_PATH_V_3_MIGRATION_PERFORMED_VERSION = new SchemaVersion(8);
- private static final float READ_REPAIR_CHANCE = 0.1f;
private final CassandraMailboxDAO mailboxDAO;
private final CassandraMailboxPathDAOImpl mailboxPathDAO;
@@ -69,6 +69,7 @@ public class CassandraMailboxMapper implements MailboxMapper {
private final CassandraACLMapper cassandraACLMapper;
private final CassandraUserMailboxRightsDAO userMailboxRightsDAO;
private final CassandraSchemaVersionManager versionManager;
+ private final CassandraConfiguration cassandraConfiguration;
private final SecureRandom secureRandom;
@Inject
@@ -78,7 +79,8 @@ public class CassandraMailboxMapper implements MailboxMapper {
CassandraMailboxPathV3DAO mailboxPathV3DAO,
CassandraUserMailboxRightsDAO userMailboxRightsDAO,
CassandraACLMapper aclMapper,
- CassandraSchemaVersionManager versionManager) {
+ CassandraSchemaVersionManager versionManager,
+ CassandraConfiguration cassandraConfiguration) {
this.mailboxDAO = mailboxDAO;
this.mailboxPathDAO = mailboxPathDAO;
this.mailboxPathV2DAO = mailboxPathV2DAO;
@@ -86,6 +88,7 @@ public class CassandraMailboxMapper implements MailboxMapper {
this.userMailboxRightsDAO = userMailboxRightsDAO;
this.cassandraACLMapper = aclMapper;
this.versionManager = versionManager;
+ this.cassandraConfiguration = cassandraConfiguration;
this.secureRandom = new SecureRandom();
}
@@ -134,7 +137,7 @@ public class CassandraMailboxMapper implements MailboxMapper {
}
private boolean shouldReadRepair() {
- return secureRandom.nextFloat() < READ_REPAIR_CHANCE;
+ return secureRandom.nextFloat() < cassandraConfiguration.getMailboxReadRepair();
}
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java
index 0be1024..2b9d1d7 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java
@@ -119,7 +119,8 @@ class CassandraMailboxMapperTest {
mailboxPathV3DAO,
userMailboxRightsDAO,
aclMapper,
- new CassandraSchemaVersionManager(versionDAO));
+ new CassandraSchemaVersionManager(versionDAO),
+ CassandraConfiguration.DEFAULT_CONFIGURATION);
}
@Nested
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2MigrationTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2MigrationTest.java
index c0832ab..6a4ef9f 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2MigrationTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2MigrationTest.java
@@ -103,7 +103,8 @@ class MailboxPathV2MigrationTest {
userMailboxRightsDAO,
CassandraConfiguration.DEFAULT_CONFIGURATION,
cassandraCluster.getCassandraConsistenciesConfiguration()),
- new CassandraSchemaVersionManager(new CassandraSchemaVersionDAO(cassandra.getConf())));
+ new CassandraSchemaVersionManager(new CassandraSchemaVersionDAO(cassandra.getConf())),
+ CassandraConfiguration.DEFAULT_CONFIGURATION);
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org