You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2021/04/08 03:56:59 UTC

[james-project] 09/12: JAMES-3435 Allow to avoid mailbox SERIAL reads when not linked to writes

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 23131f2dd6fdf4d45f4940bbcc5524424437d64e
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Sun Mar 28 22:57:33 2021 +0700

    JAMES-3435 Allow to avoid mailbox SERIAL reads when not linked to writes
    
    Let's consider now read-repairs as writes thus requires strong consistency.
---
 .../cassandra/mail/CassandraMailboxMapper.java     | 19 ++++++++----
 .../cassandra/mail/CassandraMailboxPathV3DAO.java  | 34 ++++++++++++++++++++--
 .../task/SolveMailboxInconsistenciesService.java   |  3 +-
 .../mail/CassandraMailboxPathV3DAOTest.java        |  3 +-
 4 files changed, 49 insertions(+), 10 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
index f4dbd1d..4ee3a75 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxMapper.java
@@ -19,6 +19,9 @@
 
 package org.apache.james.mailbox.cassandra.mail;
 
+import static org.apache.james.mailbox.cassandra.mail.CassandraMailboxPathV3DAO.ConsistencyChoice.STRONG;
+import static org.apache.james.mailbox.cassandra.mail.CassandraMailboxPathV3DAO.ConsistencyChoice.WEAK;
+
 import java.security.SecureRandom;
 import java.time.Duration;
 
@@ -101,7 +104,7 @@ public class CassandraMailboxMapper implements MailboxMapper {
             return mailboxDAO.retrieveMailbox(id)
                 .flatMap(mailboxEntry -> SolveMailboxInconsistenciesService.Inconsistency
                     .detectMailboxDaoInconsistency(mailboxEntry,
-                        mailboxPathV3DAO.retrieve(mailboxEntry.generateAssociatedPath()))
+                        mailboxPathV3DAO.retrieve(mailboxEntry.generateAssociatedPath(), STRONG))
                     .flatMap(inconsistency ->
                         inconsistency.fix(new SolveMailboxInconsistenciesService.Context(), mailboxDAO, mailboxPathV3DAO)
                             .then(Mono.just(mailboxEntry))));
@@ -111,11 +114,17 @@ public class CassandraMailboxMapper implements MailboxMapper {
 
     private Mono<Mailbox> performReadRepair(MailboxPath path) {
         if (shouldReadRepair()) {
-            return mailboxPathV3DAO.retrieve(path)
+            return mailboxPathV3DAO.retrieve(path, STRONG)
                 .flatMap(this::performPathReadRepair);
         }
-        return mailboxPathV3DAO.retrieve(path);
+        return mailboxPathV3DAO.retrieve(path, consistencyChoice());
+    }
 
+    private CassandraMailboxPathV3DAO.ConsistencyChoice consistencyChoice() {
+        if (cassandraConfiguration.isMailboxReadStrongConsistency()) {
+            return STRONG;
+        }
+        return WEAK;
     }
 
     private Flux<Mailbox> performReadRepair(Flux<Mailbox> pathEntries) {
@@ -250,13 +259,13 @@ public class CassandraMailboxMapper implements MailboxMapper {
             .flatMapMany(needSupport -> {
                 if (needSupport) {
                     return Flux.concat(
-                        mailboxPathV3DAO.listUserMailboxes(fixedNamespace, fixedUser),
+                        mailboxPathV3DAO.listUserMailboxes(fixedNamespace, fixedUser, consistencyChoice()),
                         Flux.concat(
                                 mailboxPathV2DAO.listUserMailboxes(fixedNamespace, fixedUser),
                                 mailboxPathDAO.listUserMailboxes(fixedNamespace, fixedUser))
                             .flatMap(this::retrieveMailbox, CONCURRENCY));
                 }
-                return mailboxPathV3DAO.listUserMailboxes(fixedNamespace, fixedUser);
+                return mailboxPathV3DAO.listUserMailboxes(fixedNamespace, fixedUser, consistencyChoice());
             });
     }
 
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAO.java
index 3a6d46b..af34896 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAO.java
@@ -32,6 +32,8 @@ import static org.apache.james.mailbox.cassandra.table.CassandraMailboxPathV3Tab
 import static org.apache.james.mailbox.cassandra.table.CassandraMailboxPathV3Table.UIDVALIDITY;
 import static org.apache.james.mailbox.cassandra.table.CassandraMailboxPathV3Table.USER;
 
+import java.util.function.Function;
+
 import javax.inject.Inject;
 
 import org.apache.james.backends.cassandra.init.configuration.CassandraConsistenciesConfiguration;
@@ -46,6 +48,7 @@ import org.apache.james.mailbox.model.UidValidity;
 import org.apache.james.util.FunctionalUtils;
 import org.apache.james.util.ReactorUtils;
 
+import com.datastax.driver.core.ConsistencyLevel;
 import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
@@ -55,6 +58,22 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 public class CassandraMailboxPathV3DAO {
+    // todo factorize me in CassandraConsistenciesConfiguration
+    public enum ConsistencyChoice {
+        WEAK(CassandraConsistenciesConfiguration::getRegular),
+        STRONG(CassandraConsistenciesConfiguration::getLightweightTransaction);
+
+        private final Function<CassandraConsistenciesConfiguration, ConsistencyLevel> choice;
+
+        ConsistencyChoice(Function<CassandraConsistenciesConfiguration, ConsistencyLevel> choice) {
+            this.choice = choice;
+        }
+
+        public ConsistencyLevel choose(CassandraConsistenciesConfiguration configuration) {
+            return choice.apply(configuration);
+        }
+    }
+
     private final CassandraAsyncExecutor cassandraAsyncExecutor;
     private final CassandraUtils cassandraUtils;
     private final PreparedStatement delete;
@@ -117,22 +136,31 @@ public class CassandraMailboxPathV3DAO {
     }
 
     public Mono<Mailbox> retrieve(MailboxPath mailboxPath) {
+        return retrieve(mailboxPath, consistenciesConfiguration.getLightweightTransaction());
+    }
+
+    public Mono<Mailbox> retrieve(MailboxPath mailboxPath, ConsistencyChoice consistencyChoice) {
+        return retrieve(mailboxPath, consistencyChoice.choose(consistenciesConfiguration));
+    }
+
+    private Mono<Mailbox> retrieve(MailboxPath mailboxPath, ConsistencyLevel consistencyLevel) {
         return cassandraAsyncExecutor.executeSingleRow(
             select.bind()
                 .setString(NAMESPACE, mailboxPath.getNamespace())
                 .setString(USER, sanitizeUser(mailboxPath.getUser()))
                 .setString(MAILBOX_NAME, mailboxPath.getName())
-                .setConsistencyLevel(consistenciesConfiguration.getLightweightTransaction()))
+                .setConsistencyLevel(consistencyLevel))
             .map(this::fromRowToCassandraIdAndPath)
             .map(FunctionalUtils.toFunction(this::logGhostMailboxSuccess))
             .switchIfEmpty(ReactorUtils.executeAndEmpty(() -> logGhostMailboxFailure(mailboxPath)));
     }
 
-    public Flux<Mailbox> listUserMailboxes(String namespace, Username user) {
+    public Flux<Mailbox> listUserMailboxes(String namespace, Username user, ConsistencyChoice consistencyChoice) {
         return cassandraAsyncExecutor.execute(
             selectUser.bind()
                 .setString(NAMESPACE, namespace)
-                .setString(USER, sanitizeUser(user)))
+                .setString(USER, sanitizeUser(user))
+                .setConsistencyLevel(consistencyChoice.choose(consistenciesConfiguration)))
             .flatMapMany(cassandraUtils::convertToFlux)
             .map(this::fromRowToCassandraIdAndPath)
             .map(FunctionalUtils.toFunction(this::logReadSuccess));
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java
index cef2c29..5f5f695 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/task/SolveMailboxInconsistenciesService.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.mailbox.cassandra.mail.task;
 
+import static org.apache.james.mailbox.cassandra.mail.CassandraMailboxPathV3DAO.ConsistencyChoice.STRONG;
 import static org.apache.james.util.ReactorUtils.DEFAULT_CONCURRENCY;
 
 import java.util.Collection;
@@ -423,7 +424,7 @@ public class SolveMailboxInconsistenciesService {
     }
 
     private Mono<Inconsistency> detectMailboxDaoInconsistency(Mailbox mailboxEntry) {
-        Mono<Mailbox> pathEntry = mailboxPathV3DAO.retrieve(mailboxEntry.generateAssociatedPath());
+        Mono<Mailbox> pathEntry = mailboxPathV3DAO.retrieve(mailboxEntry.generateAssociatedPath(), STRONG);
         return Inconsistency.detectMailboxDaoInconsistency(mailboxEntry, pathEntry);
     }
 
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAOTest.java
index 92d1aa9..a59c759 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxPathV3DAOTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.mailbox.cassandra.mail;
 
+import static org.apache.james.backends.cassandra.init.configuration.CassandraConsistenciesConfiguration.ConsistencyChoice.STRONG;
 import static org.apache.james.mailbox.cassandra.mail.MailboxFixture.MAILBOX_1;
 import static org.apache.james.mailbox.cassandra.mail.MailboxFixture.MAILBOX_2;
 import static org.apache.james.mailbox.cassandra.mail.MailboxFixture.MAILBOX_3;
@@ -95,7 +96,7 @@ class CassandraMailboxPathV3DAOTest {
         testee.save(MAILBOX_3).block();
 
         List<Mailbox> cassandraIds = testee
-            .listUserMailboxes(USER_INBOX_MAILBOXPATH.getNamespace(), USER_INBOX_MAILBOXPATH.getUser())
+            .listUserMailboxes(USER_INBOX_MAILBOXPATH.getNamespace(), USER_INBOX_MAILBOXPATH.getUser(), STRONG)
             .collectList()
             .block();
 

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org