You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ma...@apache.org on 2017/09/29 07:21:56 UTC
[18/31] james-project git commit: MAILBOX-307 Prepare Cassandra ACL
statements
MAILBOX-307 Prepare Cassandra ACL statements
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/91d303b1
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/91d303b1
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/91d303b1
Branch: refs/heads/master
Commit: 91d303b1dc72bd2a69a3b1ed8e7b3b86e7d85ca9
Parents: ee68d17
Author: benwa <bt...@linagora.com>
Authored: Mon Sep 25 17:17:47 2017 +0700
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Fri Sep 29 09:20:40 2017 +0200
----------------------------------------------------------------------
.../cassandra/mail/CassandraACLMapper.java | 118 ++++++++++++-------
1 file changed, 76 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/91d303b1/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
index 5695ffc..b2603a2 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
@@ -19,6 +19,7 @@
package org.apache.james.mailbox.cassandra.mail;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
@@ -45,15 +46,18 @@ import org.apache.james.mailbox.store.json.SimpleMailboxACLJsonConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
+import com.datastax.driver.core.querybuilder.Insert;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Throwables;
public class CassandraACLMapper {
- private static final Logger LOG = LoggerFactory.getLogger(CassandraACLMapper.class);
public static final int INITIAL_VALUE = 0;
+ private static final Logger LOG = LoggerFactory.getLogger(CassandraACLMapper.class);
+ private static final String OLD_VERSION = "oldVersion";
@FunctionalInterface
public interface CodeInjector {
@@ -61,43 +65,75 @@ public class CassandraACLMapper {
}
private final CassandraAsyncExecutor executor;
- private final Session session;
private final int maxRetry;
private final CodeInjector codeInjector;
+ private final PreparedStatement insertStatement;
+ private final PreparedStatement conditionalInsertStatement;
+ private final PreparedStatement conditionalUpdateStatement;
+ private final PreparedStatement readStatement;
public CassandraACLMapper(Session session, CassandraConfiguration cassandraConfiguration) {
this(session, cassandraConfiguration, () -> {});
}
public CassandraACLMapper(Session session, CassandraConfiguration cassandraConfiguration, CodeInjector codeInjector) {
- this.session = session;
this.executor = new CassandraAsyncExecutor(session);
this.maxRetry = cassandraConfiguration.getAclMaxRetry();
this.codeInjector = codeInjector;
+ this.insertStatement = session.prepare(insertCqlBase());
+ this.conditionalInsertStatement = session.prepare(insertCqlBase().ifNotExists());
+ this.conditionalUpdateStatement = prepareConditionalUpdate(session);
+ this.readStatement = prepareReadStatement(session);
+ }
+
+ private PreparedStatement prepareConditionalUpdate(Session session) {
+ return session.prepare(
+ update(CassandraACLTable.TABLE_NAME)
+ .where(eq(CassandraACLTable.ID, bindMarker(CassandraACLTable.ID)))
+ .with(set(CassandraACLTable.ACL, bindMarker(CassandraACLTable.ACL)))
+ .and(set(CassandraACLTable.VERSION, bindMarker(CassandraACLTable.VERSION)))
+ .onlyIf(eq(CassandraACLTable.VERSION, bindMarker(OLD_VERSION))));
+ }
+
+ private PreparedStatement prepareReadStatement(Session session) {
+ return session.prepare(
+ select(CassandraACLTable.ACL, CassandraACLTable.VERSION)
+ .from(CassandraACLTable.TABLE_NAME)
+ .where(eq(CassandraMailboxTable.ID, bindMarker(CassandraACLTable.ID))));
+ }
+
+ private Insert insertCqlBase() {
+ return insertInto(CassandraACLTable.TABLE_NAME)
+ .value(CassandraACLTable.ID, bindMarker(CassandraACLTable.ID))
+ .value(CassandraACLTable.ACL, bindMarker(CassandraACLTable.ACL))
+ .value(CassandraACLTable.VERSION, INITIAL_VALUE);
}
public CompletableFuture<MailboxACL> getACL(CassandraId cassandraId) {
- return getStoredACLRow(cassandraId).thenApply(resultSet -> {
- if (resultSet.isExhausted()) {
- return SimpleMailboxACL.EMPTY;
- }
- String serializedACL = resultSet.one().getString(CassandraACLTable.ACL);
- return deserializeACL(cassandraId, serializedACL);
- });
+ return getStoredACLRow(cassandraId)
+ .thenApply(resultSet -> getAcl(cassandraId, resultSet));
+ }
+
+ private MailboxACL getAcl(CassandraId cassandraId, ResultSet resultSet) {
+ if (resultSet.isExhausted()) {
+ return SimpleMailboxACL.EMPTY;
+ }
+ String serializedACL = resultSet.one().getString(CassandraACLTable.ACL);
+ return deserializeACL(cassandraId, serializedACL);
}
public void updateACL(CassandraId cassandraId, MailboxACL.MailboxACLCommand command) throws MailboxException {
try {
- new FunctionRunnerWithRetry(maxRetry).execute(
- () -> {
- codeInjector.inject();
- ResultSet resultSet = getAclWithVersion(cassandraId)
- .map(aclWithVersion -> aclWithVersion.apply(command))
- .map(aclWithVersion -> updateStoredACL(cassandraId, aclWithVersion))
- .orElseGet(() -> insertACL(cassandraId, applyCommandOnEmptyACL(command)));
- return resultSet.one().getBool(CassandraConstants.LIGHTWEIGHT_TRANSACTION_APPLIED);
- }
- );
+ new FunctionRunnerWithRetry(maxRetry)
+ .execute(
+ () -> {
+ codeInjector.inject();
+ ResultSet resultSet = getAclWithVersion(cassandraId)
+ .map(aclWithVersion -> aclWithVersion.apply(command))
+ .map(aclWithVersion -> updateStoredACL(cassandraId, aclWithVersion))
+ .orElseGet(() -> insertACL(cassandraId, applyCommandOnEmptyACL(command)));
+ return resultSet.one().getBool(CassandraConstants.LIGHTWEIGHT_TRANSACTION_APPLIED);
+ });
} catch (LightweightTransactionException e) {
throw new MailboxException("Exception during lightweight transaction", e);
}
@@ -105,11 +141,11 @@ public class CassandraACLMapper {
public void resetACL(CassandraId cassandraId, MailboxACL mailboxACL) {
try {
- session.execute(
- insertInto(CassandraACLTable.TABLE_NAME)
- .value(CassandraACLTable.ID, cassandraId.asUuid())
- .value(CassandraACLTable.ACL, SimpleMailboxACLJsonConverter.toJson(mailboxACL))
- .value(CassandraACLTable.VERSION, INITIAL_VALUE));
+ executor.executeVoid(
+ insertStatement.bind()
+ .setUUID(CassandraACLTable.ID, cassandraId.asUuid())
+ .setString(CassandraACLTable.ACL, SimpleMailboxACLJsonConverter.toJson(mailboxACL)))
+ .join();
} catch (JsonProcessingException e) {
throw Throwables.propagate(e);
}
@@ -124,20 +160,20 @@ public class CassandraACLMapper {
}
private CompletableFuture<ResultSet> getStoredACLRow(CassandraId cassandraId) {
- return executor.execute(select(CassandraACLTable.ACL, CassandraACLTable.VERSION)
- .from(CassandraACLTable.TABLE_NAME)
- .where(eq(CassandraMailboxTable.ID, cassandraId.asUuid())));
+ return executor.execute(
+ readStatement.bind()
+ .setUUID(CassandraACLTable.ID, cassandraId.asUuid()));
}
private ResultSet updateStoredACL(CassandraId cassandraId, ACLWithVersion aclWithVersion) {
try {
- return session.execute(
- update(CassandraACLTable.TABLE_NAME)
- .with(set(CassandraACLTable.ACL, SimpleMailboxACLJsonConverter.toJson(aclWithVersion.mailboxACL)))
- .and(set(CassandraACLTable.VERSION, aclWithVersion.version + 1))
- .where(eq(CassandraACLTable.ID, cassandraId.asUuid()))
- .onlyIf(eq(CassandraACLTable.VERSION, aclWithVersion.version))
- );
+ return executor.execute(
+ conditionalUpdateStatement.bind()
+ .setUUID(CassandraACLTable.ID, cassandraId.asUuid())
+ .setString(CassandraACLTable.ACL, SimpleMailboxACLJsonConverter.toJson(aclWithVersion.mailboxACL))
+ .setLong(CassandraACLTable.VERSION, aclWithVersion.version + 1)
+ .setLong(OLD_VERSION, aclWithVersion.version))
+ .join();
} catch (JsonProcessingException exception) {
throw Throwables.propagate(exception);
}
@@ -145,13 +181,11 @@ public class CassandraACLMapper {
private ResultSet insertACL(CassandraId cassandraId, MailboxACL acl) {
try {
- return session.execute(
- insertInto(CassandraACLTable.TABLE_NAME)
- .value(CassandraACLTable.ID, cassandraId.asUuid())
- .value(CassandraACLTable.ACL, SimpleMailboxACLJsonConverter.toJson(acl))
- .value(CassandraACLTable.VERSION, 0)
- .ifNotExists()
- );
+ return executor.execute(
+ conditionalInsertStatement.bind()
+ .setUUID(CassandraACLTable.ID, cassandraId.asUuid())
+ .setString(CassandraACLTable.ACL, SimpleMailboxACLJsonConverter.toJson(acl)))
+ .join();
} catch (JsonProcessingException exception) {
throw Throwables.propagate(exception);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org