You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/03/09 16:07:43 UTC
[james-project] 12/17: JAMES-3074 Cassandra: On the fly UidValidity
sanitizing
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 819f25da21c96383e930827e585ecedf0c7078a3
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Feb 26 14:37:19 2020 +0700
JAMES-3074 Cassandra: On the fly UidValidity sanitizing
Impact: minor. Statistically, one entry out of 2 billion is affected.
---
.../cassandra/mail/CassandraMailboxDAO.java | 52 +++++++++++++----
.../cassandra/mail/CassandraMailboxDAOTest.java | 68 ++++++++++++++++++++++
2 files changed, 108 insertions(+), 12 deletions(-)
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java
index 02a7b13..87c9505 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAO.java
@@ -65,6 +65,7 @@ public class CassandraMailboxDAO {
private final PreparedStatement deleteStatement;
private final PreparedStatement insertStatement;
private final PreparedStatement updateStatement;
+ private final PreparedStatement updateUidValidityStatement;
@Inject
public CassandraMailboxDAO(Session session, CassandraTypesProvider typesProvider, CassandraUtils cassandraUtils) {
@@ -72,6 +73,7 @@ public class CassandraMailboxDAO {
this.mailboxBaseTupleUtil = new MailboxBaseTupleUtil(typesProvider);
this.insertStatement = prepareInsert(session);
this.updateStatement = prepareUpdate(session);
+ this.updateUidValidityStatement = prepareUpdateUidValidity(session);
this.deleteStatement = prepareDelete(session);
this.listStatement = prepareList(session);
this.readStatement = prepareRead(session);
@@ -98,6 +100,12 @@ public class CassandraMailboxDAO {
.where(eq(ID, bindMarker(ID))));
}
+ private PreparedStatement prepareUpdateUidValidity(Session session) {
+ return session.prepare(update(TABLE_NAME)
+ .with(set(UIDVALIDITY, bindMarker(UIDVALIDITY)))
+ .where(eq(ID, bindMarker(ID))));
+ }
+
private PreparedStatement prepareDelete(Session session) {
return session.prepare(QueryBuilder.delete()
.from(TABLE_NAME)
@@ -138,27 +146,47 @@ public class CassandraMailboxDAO {
return executor.executeSingleRow(readStatement.bind()
.setUUID(ID, mailboxId.asUuid())
.setConsistencyLevel(QUORUM))
- .map(row -> mailboxFromRow(row, mailboxId));
+ .flatMap(row -> mailboxFromRow(row, mailboxId));
}
- private Mailbox mailboxFromRow(Row row, CassandraId cassandraId) {
- return new Mailbox(
- new MailboxPath(
- row.getUDTValue(MAILBOX_BASE).getString(CassandraMailboxTable.MailboxBase.NAMESPACE),
- Username.of(row.getUDTValue(MAILBOX_BASE).getString(CassandraMailboxTable.MailboxBase.USER)),
- row.getString(NAME)),
- UidValidity.of(row.getLong(UIDVALIDITY)),
- cassandraId);
+ private Mono<Mailbox> mailboxFromRow(Row row, CassandraId cassandraId) {
+ return sanitizeUidValidity(cassandraId, row.getLong(UIDVALIDITY))
+ .map(uidValidity -> new Mailbox(
+ new MailboxPath(
+ row.getUDTValue(MAILBOX_BASE).getString(CassandraMailboxTable.MailboxBase.NAMESPACE),
+ Username.of(row.getUDTValue(MAILBOX_BASE).getString(CassandraMailboxTable.MailboxBase.USER)),
+ row.getString(NAME)),
+ uidValidity,
+ cassandraId));
+ }
+
+ private Mono<UidValidity> sanitizeUidValidity(CassandraId cassandraId, long uidValidityAsLong) {
+ if (!UidValidity.isValid(uidValidityAsLong)) {
+ UidValidity newUidValidity = UidValidity.generate();
+ return updateUidValidity(cassandraId, newUidValidity)
+ .then(Mono.just(newUidValidity));
+ }
+ return Mono.just(UidValidity.ofValid(uidValidityAsLong));
+ }
+
+ /**
+ * Expected concurrency issue in the absence of performance expensive LightWeight transaction
+ * As the Uid validity is updated only when equal to 0 (1 chance out of 4 billion) the benefits of LWT don't
+ * outweigh the performance costs
+ */
+ private Mono<Void> updateUidValidity(CassandraId cassandraId, UidValidity uidValidity) {
+ return executor.executeVoid(updateUidValidityStatement.bind()
+ .setUUID(ID, cassandraId.asUuid())
+ .setLong(UIDVALIDITY, uidValidity.asLong()));
}
public Flux<Mailbox> retrieveAllMailboxes() {
return executor.execute(listStatement.bind())
.flatMapMany(cassandraUtils::convertToFlux)
- .map(this::toMailboxWithId);
+ .flatMap(this::toMailboxWithId);
}
- private Mailbox toMailboxWithId(Row row) {
+ private Mono<Mailbox> toMailboxWithId(Row row) {
return mailboxFromRow(row, CassandraId.of(row.getUUID(ID)));
}
-
}
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAOTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAOTest.java
index 9aa1486..54d0134 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAOTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraMailboxDAOTest.java
@@ -19,6 +19,10 @@
package org.apache.james.mailbox.cassandra.mail;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.set;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.update;
+import static org.apache.james.backends.cassandra.Scenario.Builder.awaitOn;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.List;
@@ -26,6 +30,8 @@ import java.util.Optional;
import org.apache.james.backends.cassandra.CassandraCluster;
import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.Scenario;
+import org.apache.james.backends.cassandra.Scenario.Barrier;
import org.apache.james.backends.cassandra.components.CassandraModule;
import org.apache.james.backends.cassandra.versions.CassandraSchemaVersionModule;
import org.apache.james.core.Username;
@@ -37,9 +43,13 @@ import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.mailbox.model.MailboxPath;
import org.apache.james.mailbox.model.UidValidity;
import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
class CassandraMailboxDAOTest {
private static final UidValidity UID_VALIDITY_1 = UidValidity.ofValid(145);
private static final UidValidity UID_VALIDITY_2 = UidValidity.ofValid(147);
@@ -84,6 +94,64 @@ class CassandraMailboxDAOTest {
}
@Test
+ void retrieveMailboxShouldSanitizeInvalidUidValidityUponRead(CassandraCluster cassandra) {
+ testee.save(mailbox1).block();
+
+ // Hack to insert a faulty value
+ cassandra.getConf().execute(update("mailbox")
+ .with(set("uidvalidity", -12))
+ .where(eq("id", CASSANDRA_ID_1.asUuid())));
+
+ Optional<Mailbox> readMailbox = testee.retrieveMailbox(CASSANDRA_ID_1)
+ .blockOptional();
+ assertThat(readMailbox).isPresent()
+ .hasValueSatisfying(mailbox -> assertThat(mailbox.getUidValidity().isValid()).isTrue());
+ }
+
+ @Test
+ void retrieveAllShouldSanitizeInvalidUidValidityUponRead(CassandraCluster cassandra) {
+ testee.save(mailbox1).block();
+
+ // Hack to insert a faulty value
+ cassandra.getConf().execute(update("mailbox")
+ .with(set("uidvalidity", -12))
+ .where(eq("id", CASSANDRA_ID_1.asUuid())));
+
+ List<Mailbox> readMailbox = testee.retrieveAllMailboxes().collectList().block();
+ assertThat(readMailbox).hasSize(1)
+ .allSatisfy(mailbox -> assertThat(mailbox.getUidValidity().isValid()).isTrue());
+ }
+
+ @Disabled("Expected concurrency issue in the absence of performance expensive LightWeight transaction" +
+ "As the Uid validity is updated only when equal to 0 (1 chance out of 4 billion) the benefits of LWT don't" +
+ "outweigh the costs")
+ @Test
+ void retrieveMailboxShouldNotBeSubjectToDataRaceUponUidValiditySanitizing(CassandraCluster cassandra) throws Exception {
+ testee.save(mailbox1).block();
+
+ // Hack to insert a faulty value
+ cassandra.getConf().execute(update("mailbox")
+ .with(set("uidvalidity", -12))
+ .where(eq("id", CASSANDRA_ID_1.asUuid())));
+
+ Barrier barrier = new Barrier(2);
+ cassandra.getConf().registerScenario(awaitOn(barrier)
+ .times(2)
+ .whenQueryStartsWith("UPDATE mailbox SET"));
+
+ Mono<Mailbox> readMailbox1 = testee.retrieveMailbox(CASSANDRA_ID_1).cache();
+ Mono<Mailbox> readMailbox2 = testee.retrieveMailbox(CASSANDRA_ID_1).cache();
+ readMailbox1.subscribeOn(Schedulers.elastic()).subscribe();
+ readMailbox2.subscribeOn(Schedulers.elastic()).subscribe();
+
+ barrier.awaitCaller();
+ barrier.releaseCaller();
+
+ assertThat(readMailbox1.block().getUidValidity())
+ .isEqualTo(readMailbox2.block().getUidValidity());
+ }
+
+ @Test
void saveShouldOverride() {
testee.save(mailbox1).block();
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org