You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2020/10/14 02:31:46 UTC

[james-project] 20/22: JAMES-3407 Configuration for read repairs

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 5bfa119feda1dece37911650c1e62cd7843d1ae8
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu Oct 8 16:46:22 2020 +0700

    JAMES-3407 Configuration for read repairs
---
 .../init/configuration/CassandraConfiguration.java | 32 ++++++++++++++++++++--
 .../CassandraMailboxSessionMapperFactory.java      |  2 +-
 .../cassandra/mail/CassandraMailboxMapper.java     |  9 ++++--
 .../cassandra/mail/CassandraMailboxMapperTest.java |  3 +-
 .../mail/migration/MailboxPathV2MigrationTest.java |  3 +-
 5 files changed, 40 insertions(+), 9 deletions(-)

diff --git a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/configuration/CassandraConfiguration.java b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/configuration/CassandraConfiguration.java
index 42bbce0..1acf52d 100644
--- a/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/configuration/CassandraConfiguration.java
+++ b/backends-common/cassandra/src/main/java/org/apache/james/backends/cassandra/init/configuration/CassandraConfiguration.java
@@ -39,6 +39,7 @@ public class CassandraConfiguration {
     private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(CassandraConfiguration.class);
 
     public static final int DEFAULT_MESSAGE_CHUNK_SIZE_ON_READ = 100;
+    public static final float DEFAULT_MAILBOX_READ_REPAIR = 0.1f;
     public static final int DEFAULT_EXPUNGE_BATCH_SIZE = 50;
     public static final int DEFAULT_UPDATE_FLAGS_BATCH_SIZE = 20;
     public static final int DEFAULT_FLAGS_UPDATE_MESSAGE_MAX_RETRY = 1000;
@@ -55,6 +56,7 @@ public class CassandraConfiguration {
     public static final List<String> VALID_CONSISTENCY_LEVEL_REGULAR = ImmutableList.of("QUORUM", "LOCAL_QUORUM", "EACH_QUORUM");
     public static final List<String> VALID_CONSISTENCY_LEVEL_LIGHTWEIGHT_TRANSACTION = ImmutableList.of("SERIAL", "LOCAL_SERIAL");
 
+    private static final String MAILBOX_READ_REPAIR = "mailbox.read.repair.chance";
     private static final String MAILBOX_MAX_RETRY_ACL = "mailbox.max.retry.acl";
     private static final String MAILBOX_MAX_RETRY_MODSEQ = "mailbox.max.retry.modseq";
     private static final String MAILBOX_MAX_RETRY_UID = "mailbox.max.retry.uid";
@@ -85,6 +87,7 @@ public class CassandraConfiguration {
         private Optional<Integer> messageAttachmentIdsReadTimeout = Optional.empty();
         private Optional<String> consistencyLevelRegular = Optional.empty();
         private Optional<String> consistencyLevelLightweightTransaction = Optional.empty();
+        private Optional<Float> mailboxReadRepair = Optional.empty();
 
         public Builder messageReadChunkSize(int value) {
             Preconditions.checkArgument(value > 0, "messageReadChunkSize needs to be strictly positive");
@@ -152,6 +155,13 @@ public class CassandraConfiguration {
             return this;
         }
 
+        public Builder mailboxReadRepair(float value) {
+            Preconditions.checkArgument(value >= 0, "mailboxReadRepair needs to be positive");
+            Preconditions.checkArgument(value <= 1, "mailboxReadRepair needs to be less or equal to 1");
+            this.mailboxReadRepair = Optional.of(value);
+            return this;
+        }
+
         public Builder messageReadChunkSize(Optional<Integer> value) {
             value.ifPresent(this::messageReadChunkSize);
             return this;
@@ -207,6 +217,11 @@ public class CassandraConfiguration {
             return this;
         }
 
+        public Builder mailboxReadRepair(Optional<Float> value) {
+            value.ifPresent(this::mailboxReadRepair);
+            return this;
+        }
+
         public Builder consistencyLevelRegular(String value) {
             Preconditions.checkArgument(VALID_CONSISTENCY_LEVEL_REGULAR.contains(value),
                 "consistencyLevelRegular needs to be one of the following: " + String.join(", ", VALID_CONSISTENCY_LEVEL_REGULAR));
@@ -252,7 +267,8 @@ public class CassandraConfiguration {
                 attachmentV2MigrationReadTimeout.orElse(DEFAULT_ATTACHMENT_V2_MIGRATION_READ_TIMEOUT),
                 messageAttachmentIdsReadTimeout.orElse(DEFAULT_MESSAGE_ATTACHMENT_ID_MIGRATION_READ_TIMEOUT),
                 consistencyLevelRegular,
-                consistencyLevelLightweightTransaction);
+                consistencyLevelLightweightTransaction,
+                mailboxReadRepair.orElse(DEFAULT_MAILBOX_READ_REPAIR));
         }
     }
 
@@ -288,6 +304,8 @@ public class CassandraConfiguration {
                     propertiesConfiguration.getString(CONSISTENCY_LEVEL_REGULAR)))
             .consistencyLevelLightweightTransaction(Optional.ofNullable(
                     propertiesConfiguration.getString(CONSISTENCY_LEVEL_LIGHTWEIGHT_TRANSACTION)))
+            .mailboxReadRepair(Optional.ofNullable(
+                propertiesConfiguration.getFloat(MAILBOX_READ_REPAIR, null)))
             .build();
     }
 
@@ -304,13 +322,14 @@ public class CassandraConfiguration {
     private final int messageAttachmentIdsReadTimeout;
     private final String consistencyLevelRegular;
     private final String consistencyLevelLightweightTransaction;
+    private final float mailboxReadRepair;
 
     @VisibleForTesting
     CassandraConfiguration(int aclMaxRetry, int messageReadChunkSize, int expungeChunkSize,
                            int flagsUpdateMessageIdMaxRetry, int flagsUpdateMessageMaxRetry,
                            int modSeqMaxRetry, int uidMaxRetry, int fetchNextPageInAdvanceRow,
                            int blobPartSize, final int attachmentV2MigrationReadTimeout, int messageAttachmentIdsReadTimeout,
-                           String consistencyLevelRegular, String consistencyLevelLightweightTransaction) {
+                           String consistencyLevelRegular, String consistencyLevelLightweightTransaction, float mailboxReadRepair) {
         this.aclMaxRetry = aclMaxRetry;
         this.messageReadChunkSize = messageReadChunkSize;
         this.expungeChunkSize = expungeChunkSize;
@@ -324,6 +343,11 @@ public class CassandraConfiguration {
         this.messageAttachmentIdsReadTimeout = messageAttachmentIdsReadTimeout;
         this.consistencyLevelRegular = consistencyLevelRegular;
         this.consistencyLevelLightweightTransaction = consistencyLevelLightweightTransaction;
+        this.mailboxReadRepair = mailboxReadRepair;
+    }
+
+    public float getMailboxReadRepair() {
+        return mailboxReadRepair;
     }
 
     public int getBlobPartSize() {
@@ -390,6 +414,7 @@ public class CassandraConfiguration {
                 && Objects.equals(this.flagsUpdateMessageMaxRetry, that.flagsUpdateMessageMaxRetry)
                 && Objects.equals(this.modSeqMaxRetry, that.modSeqMaxRetry)
                 && Objects.equals(this.uidMaxRetry, that.uidMaxRetry)
+                && Objects.equals(this.mailboxReadRepair, that.mailboxReadRepair)
                 && Objects.equals(this.fetchNextPageInAdvanceRow, that.fetchNextPageInAdvanceRow)
                 && Objects.equals(this.blobPartSize, that.blobPartSize)
                 && Objects.equals(this.attachmentV2MigrationReadTimeout, that.attachmentV2MigrationReadTimeout)
@@ -405,7 +430,7 @@ public class CassandraConfiguration {
         return Objects.hash(aclMaxRetry, messageReadChunkSize, expungeChunkSize, flagsUpdateMessageIdMaxRetry,
             flagsUpdateMessageMaxRetry, modSeqMaxRetry, uidMaxRetry, fetchNextPageInAdvanceRow,
             blobPartSize, attachmentV2MigrationReadTimeout, messageAttachmentIdsReadTimeout,
-            consistencyLevelRegular, consistencyLevelLightweightTransaction);
+            consistencyLevelRegular, consistencyLevelLightweightTransaction, mailboxReadRepair);
     }
 
     @Override
@@ -418,6 +443,7 @@ public class CassandraConfiguration {
             .add("flagsUpdateMessageMaxRetry", flagsUpdateMessageMaxRetry)
             .add("modSeqMaxRetry", modSeqMaxRetry)
             .add("fetchNextPageInAdvanceRow", fetchNextPageInAdvanceRow)
+            .add("mailboxReadRepair", mailboxReadRepair)
             .add("uidMaxRetry", uidMaxRetry)
             .add("blobPartSize", blobPartSize)
             .add("attachmentV2MigrationReadTimeout", attachmentV2MigrationReadTimeout)
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
index 9767519..3cd91fd 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/CassandraMailboxSessionMapperFactory.java
@@ -168,7 +168,7 @@ public class CassandraMailboxSessionMapperFactory extends MailboxSessionMapperFa
 
     @Override
     public MailboxMapper createMailboxMapper(MailboxSession mailboxSession) {
-        return new CassandraMailboxMapper(mailboxDAO, mailboxPathDAO, mailboxPathV2DAO, mailboxPathV3DAO, userMailboxRightsDAO, aclMapper, versionManager);
+        return new CassandraMailboxMapper(mailboxDAO, mailboxPathDAO, mailboxPathV2DAO, mailboxPathV3DAO, userMailboxRightsDAO, aclMapper, versionManager, cassandraConfiguration);
     }
 
     @Override
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
index e6d868d..e918b8e 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
@@ -25,6 +25,7 @@ import java.time.Duration;
 import javax.inject.Inject;
 
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionManager;
 import org.apache.james.backends.cassandra.versions.SchemaVersion;
 import org.apache.james.core.Username;
@@ -60,7 +61,6 @@ public class CassandraMailboxMapper implements MailboxMapper {
     private static final Duration MIN_RETRY_BACKOFF = Duration.ofMillis(10);
     private static final Duration MAX_RETRY_BACKOFF = Duration.ofMillis(1000);
     private static final SchemaVersion MAILBOX_PATH_V_3_MIGRATION_PERFORMED_VERSION = new SchemaVersion(8);
-    private static final float READ_REPAIR_CHANCE = 0.1f;
 
     private final CassandraMailboxDAO mailboxDAO;
     private final CassandraMailboxPathDAOImpl mailboxPathDAO;
@@ -69,6 +69,7 @@ public class CassandraMailboxMapper implements MailboxMapper {
     private final CassandraACLMapper cassandraACLMapper;
     private final CassandraUserMailboxRightsDAO userMailboxRightsDAO;
     private final CassandraSchemaVersionManager versionManager;
+    private final CassandraConfiguration cassandraConfiguration;
     private final SecureRandom secureRandom;
 
     @Inject
@@ -78,7 +79,8 @@ public class CassandraMailboxMapper implements MailboxMapper {
                                   CassandraMailboxPathV3DAO mailboxPathV3DAO,
                                   CassandraUserMailboxRightsDAO userMailboxRightsDAO,
                                   CassandraACLMapper aclMapper,
-                                  CassandraSchemaVersionManager versionManager) {
+                                  CassandraSchemaVersionManager versionManager,
+                                  CassandraConfiguration cassandraConfiguration) {
         this.mailboxDAO = mailboxDAO;
         this.mailboxPathDAO = mailboxPathDAO;
         this.mailboxPathV2DAO = mailboxPathV2DAO;
@@ -86,6 +88,7 @@ public class CassandraMailboxMapper implements MailboxMapper {
         this.userMailboxRightsDAO = userMailboxRightsDAO;
         this.cassandraACLMapper = aclMapper;
         this.versionManager = versionManager;
+        this.cassandraConfiguration = cassandraConfiguration;
         this.secureRandom = new SecureRandom();
     }
 
@@ -134,7 +137,7 @@ public class CassandraMailboxMapper implements MailboxMapper {
     }
 
     private boolean shouldReadRepair() {
-        return secureRandom.nextFloat() < READ_REPAIR_CHANCE;
+        return secureRandom.nextFloat() < cassandraConfiguration.getMailboxReadRepair();
     }
 
 
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java
index 0be1024..2b9d1d7 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapperTest.java
@@ -119,7 +119,8 @@ class CassandraMailboxMapperTest {
             mailboxPathV3DAO,
             userMailboxRightsDAO,
             aclMapper,
-            new CassandraSchemaVersionManager(versionDAO));
+            new CassandraSchemaVersionManager(versionDAO),
+            CassandraConfiguration.DEFAULT_CONFIGURATION);
     }
 
     @Nested
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2MigrationTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2MigrationTest.java
index c0832ab..6a4ef9f 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2MigrationTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/migration/MailboxPathV2MigrationTest.java
@@ -103,7 +103,8 @@ class MailboxPathV2MigrationTest {
                 userMailboxRightsDAO,
                 CassandraConfiguration.DEFAULT_CONFIGURATION,
                 cassandraCluster.getCassandraConsistenciesConfiguration()),
-            new CassandraSchemaVersionManager(new CassandraSchemaVersionDAO(cassandra.getConf())));
+            new CassandraSchemaVersionManager(new CassandraSchemaVersionDAO(cassandra.getConf())),
+            CassandraConfiguration.DEFAULT_CONFIGURATION);
     }
 
     @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org