You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/03/09 16:07:43 UTC

[james-project] 12/17: JAMES-3074 Cassandra: On the fly UidValidity sanitizing

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 819f25da21c96383e930827e585ecedf0c7078a3
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Feb 26 14:37:19 2020 +0700

    JAMES-3074 Cassandra: On the fly UidValidity sanitizing
    
    Impact: minor. Statistically, one entry out of 2 billion is affected.
---
 .../cassandra/mail/CassandraMailboxDAO.java        | 52 +++++++++++++----
 .../cassandra/mail/CassandraMailboxDAOTest.java    | 68 ++++++++++++++++++++++
 2 files changed, 108 insertions(+), 12 deletions(-)

diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java
index 02a7b13..87c9505 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java
@@ -65,6 +65,7 @@ public class CassandraMailboxDAO {
     private final PreparedStatement deleteStatement;
     private final PreparedStatement insertStatement;
     private final PreparedStatement updateStatement;
+    private final PreparedStatement updateUidValidityStatement;
 
     @Inject
     public CassandraMailboxDAO(Session session, CassandraTypesProvider typesProvider, CassandraUtils cassandraUtils) {
@@ -72,6 +73,7 @@ public class CassandraMailboxDAO {
         this.mailboxBaseTupleUtil = new MailboxBaseTupleUtil(typesProvider);
         this.insertStatement = prepareInsert(session);
         this.updateStatement = prepareUpdate(session);
+        this.updateUidValidityStatement = prepareUpdateUidValidity(session);
         this.deleteStatement = prepareDelete(session);
         this.listStatement = prepareList(session);
         this.readStatement = prepareRead(session);
@@ -98,6 +100,12 @@ public class CassandraMailboxDAO {
             .where(eq(ID, bindMarker(ID))));
     }
 
+    private PreparedStatement prepareUpdateUidValidity(Session session) {
+        return session.prepare(update(TABLE_NAME)
+            .with(set(UIDVALIDITY, bindMarker(UIDVALIDITY)))
+            .where(eq(ID, bindMarker(ID))));
+    }
+
     private PreparedStatement prepareDelete(Session session) {
         return session.prepare(QueryBuilder.delete()
             .from(TABLE_NAME)
@@ -138,27 +146,47 @@ public class CassandraMailboxDAO {
         return executor.executeSingleRow(readStatement.bind()
             .setUUID(ID, mailboxId.asUuid())
             .setConsistencyLevel(QUORUM))
-            .map(row -> mailboxFromRow(row, mailboxId));
+            .flatMap(row -> mailboxFromRow(row, mailboxId));
     }
 
-    private Mailbox mailboxFromRow(Row row, CassandraId cassandraId) {
-        return new Mailbox(
-            new MailboxPath(
-                row.getUDTValue(MAILBOX_BASE).getString(CassandraMailboxTable.MailboxBase.NAMESPACE),
-                Username.of(row.getUDTValue(MAILBOX_BASE).getString(CassandraMailboxTable.MailboxBase.USER)),
-                row.getString(NAME)),
-            UidValidity.of(row.getLong(UIDVALIDITY)),
-            cassandraId);
+    private Mono<Mailbox> mailboxFromRow(Row row, CassandraId cassandraId) {
+        return sanitizeUidValidity(cassandraId, row.getLong(UIDVALIDITY))
+            .map(uidValidity -> new Mailbox(
+                new MailboxPath(
+                    row.getUDTValue(MAILBOX_BASE).getString(CassandraMailboxTable.MailboxBase.NAMESPACE),
+                    Username.of(row.getUDTValue(MAILBOX_BASE).getString(CassandraMailboxTable.MailboxBase.USER)),
+                    row.getString(NAME)),
+                uidValidity,
+                cassandraId));
+    }
+    
+    private Mono<UidValidity> sanitizeUidValidity(CassandraId cassandraId, long uidValidityAsLong) {
+        if (!UidValidity.isValid(uidValidityAsLong)) {
+            UidValidity newUidValidity = UidValidity.generate();
+            return updateUidValidity(cassandraId, newUidValidity)
+                .then(Mono.just(newUidValidity));
+        }
+        return Mono.just(UidValidity.ofValid(uidValidityAsLong));
+    }
+
+    /**
+     * Expected concurrency issue in the absence of performance expensive LightWeight transaction
+     * As the Uid validity is updated only when equal to 0 (1 chance out of 4 billion) the benefits of LWT don't
+     * outweigh the performance costs
+     */
+    private Mono<Void> updateUidValidity(CassandraId cassandraId, UidValidity uidValidity) {
+        return executor.executeVoid(updateUidValidityStatement.bind()
+                .setUUID(ID, cassandraId.asUuid())
+                .setLong(UIDVALIDITY, uidValidity.asLong()));
     }
 
     public Flux<Mailbox> retrieveAllMailboxes() {
         return executor.execute(listStatement.bind())
             .flatMapMany(cassandraUtils::convertToFlux)
-            .map(this::toMailboxWithId);
+            .flatMap(this::toMailboxWithId);
     }
 
-    private Mailbox toMailboxWithId(Row row) {
+    private Mono<Mailbox> toMailboxWithId(Row row) {
         return mailboxFromRow(row, CassandraId.of(row.getUUID(ID)));
     }
-
 }
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAOTest.java
index 9aa1486..54d0134 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAOTest.java
@@ -19,6 +19,10 @@
 
 package org.apache.james.mailbox.cassandra.mail;
 
+import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.set;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.update;
+import static org.apache.james.backends.cassandra.Scenario.Builder.awaitOn;
 import static org.assertj.core.api.Assertions.assertThat;
 
 import java.util.List;
@@ -26,6 +30,8 @@ import java.util.Optional;
 
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.Scenario;
+import org.apache.james.backends.cassandra.Scenario.Barrier;
 import org.apache.james.backends.cassandra.components.CassandraModule;
 import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
 import org.apache.james.core.Username;
@@ -37,9 +43,13 @@ import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.model.MailboxPath;
 import org.apache.james.mailbox.model.UidValidity;
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
 class CassandraMailboxDAOTest {
     private static final UidValidity UID_VALIDITY_1 = UidValidity.ofValid(145);
     private static final UidValidity UID_VALIDITY_2 = UidValidity.ofValid(147);
@@ -84,6 +94,64 @@ class CassandraMailboxDAOTest {
     }
 
     @Test
+    void retrieveMailboxShouldSanitizeInvalidUidValidityUponRead(CassandraCluster cassandra) {
+        testee.save(mailbox1).block();
+
+        // Hack to insert a faulty value
+        cassandra.getConf().execute(update("mailbox")
+            .with(set("uidvalidity", -12))
+            .where(eq("id", CASSANDRA_ID_1.asUuid())));
+
+        Optional<Mailbox> readMailbox = testee.retrieveMailbox(CASSANDRA_ID_1)
+            .blockOptional();
+        assertThat(readMailbox).isPresent()
+            .hasValueSatisfying(mailbox -> assertThat(mailbox.getUidValidity().isValid()).isTrue());
+    }
+
+    @Test
+    void retrieveAllShouldSanitizeInvalidUidValidityUponRead(CassandraCluster cassandra) {
+        testee.save(mailbox1).block();
+
+        // Hack to insert a faulty value
+        cassandra.getConf().execute(update("mailbox")
+            .with(set("uidvalidity", -12))
+            .where(eq("id", CASSANDRA_ID_1.asUuid())));
+
+        List<Mailbox> readMailbox = testee.retrieveAllMailboxes().collectList().block();
+        assertThat(readMailbox).hasSize(1)
+            .allSatisfy(mailbox -> assertThat(mailbox.getUidValidity().isValid()).isTrue());
+    }
+
+    @Disabled("Expected concurrency issue in the absence of performance expensive LightWeight transaction" +
+        "As the Uid validity is updated only when equal to 0 (1 chance out of 4 billion) the benefits of LWT don't" +
+        "outweigh the costs")
+    @Test
+    void retrieveMailboxShouldNotBeSubjectToDataRaceUponUidValiditySanitizing(CassandraCluster cassandra) throws Exception {
+        testee.save(mailbox1).block();
+
+        // Hack to insert a faulty value
+        cassandra.getConf().execute(update("mailbox")
+            .with(set("uidvalidity", -12))
+            .where(eq("id", CASSANDRA_ID_1.asUuid())));
+
+        Barrier barrier = new Barrier(2);
+        cassandra.getConf().registerScenario(awaitOn(barrier)
+            .times(2)
+            .whenQueryStartsWith("UPDATE mailbox SET"));
+
+        Mono<Mailbox> readMailbox1 = testee.retrieveMailbox(CASSANDRA_ID_1).cache();
+        Mono<Mailbox> readMailbox2 = testee.retrieveMailbox(CASSANDRA_ID_1).cache();
+        readMailbox1.subscribeOn(Schedulers.elastic()).subscribe();
+        readMailbox2.subscribeOn(Schedulers.elastic()).subscribe();
+
+        barrier.awaitCaller();
+        barrier.releaseCaller();
+
+        assertThat(readMailbox1.block().getUidValidity())
+            .isEqualTo(readMailbox2.block().getUidValidity());
+    }
+
+    @Test
     void saveShouldOverride() {
         testee.save(mailbox1).block();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org