You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ma...@apache.org on 2017/09/29 07:22:07 UTC
[29/31] james-project git commit: MAILBOX-307 Implement setAcl with a
Cassandra read before writes
MAILBOX-307 Implement setAcl with a Cassandra read before writes
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/d5e1cf8f
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/d5e1cf8f
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/d5e1cf8f
Branch: refs/heads/master
Commit: d5e1cf8f39550d5cc7234a9d864414dd2d881c65
Parents: 998446b
Author: benwa <bt...@linagora.com>
Authored: Thu Sep 28 15:19:47 2017 +0700
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Fri Sep 29 09:20:41 2017 +0200
----------------------------------------------------------------------
.../cassandra/mail/CassandraACLMapper.java | 75 +++++++++-----------
1 file changed, 34 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/d5e1cf8f/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
index a0dccf0..1e67d41 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
@@ -29,10 +29,10 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.update;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
import org.apache.james.backends.cassandra.init.CassandraConfiguration;
import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
-import org.apache.james.backends.cassandra.utils.CassandraConstants;
import org.apache.james.backends.cassandra.utils.FunctionRunnerWithRetry;
import org.apache.james.backends.cassandra.utils.LightweightTransactionException;
import org.apache.james.mailbox.cassandra.ids.CassandraId;
@@ -49,7 +49,6 @@ import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
-import com.datastax.driver.core.querybuilder.Insert;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Throwables;
@@ -66,7 +65,6 @@ public class CassandraACLMapper {
private final CassandraAsyncExecutor executor;
private final int maxRetry;
private final CodeInjector codeInjector;
- private final PreparedStatement insertStatement;
private final PreparedStatement conditionalInsertStatement;
private final PreparedStatement conditionalUpdateStatement;
private final PreparedStatement readStatement;
@@ -79,12 +77,20 @@ public class CassandraACLMapper {
this.executor = new CassandraAsyncExecutor(session);
this.maxRetry = cassandraConfiguration.getAclMaxRetry();
this.codeInjector = codeInjector;
- this.insertStatement = session.prepare(insertCqlBase());
- this.conditionalInsertStatement = session.prepare(insertCqlBase().ifNotExists());
+ this.conditionalInsertStatement = prepareConditionalInsert(session);
this.conditionalUpdateStatement = prepareConditionalUpdate(session);
this.readStatement = prepareReadStatement(session);
}
+ private PreparedStatement prepareConditionalInsert(Session session) {
+ return session.prepare(
+ insertInto(CassandraACLTable.TABLE_NAME)
+ .value(CassandraACLTable.ID, bindMarker(CassandraACLTable.ID))
+ .value(CassandraACLTable.ACL, bindMarker(CassandraACLTable.ACL))
+ .value(CassandraACLTable.VERSION, INITIAL_VALUE)
+ .ifNotExists());
+ }
+
private PreparedStatement prepareConditionalUpdate(Session session) {
return session.prepare(
update(CassandraACLTable.TABLE_NAME)
@@ -101,13 +107,6 @@ public class CassandraACLMapper {
.where(eq(CassandraMailboxTable.ID, bindMarker(CassandraACLTable.ID))));
}
- private Insert insertCqlBase() {
- return insertInto(CassandraACLTable.TABLE_NAME)
- .value(CassandraACLTable.ID, bindMarker(CassandraACLTable.ID))
- .value(CassandraACLTable.ACL, bindMarker(CassandraACLTable.ACL))
- .value(CassandraACLTable.VERSION, INITIAL_VALUE);
- }
-
public CompletableFuture<MailboxACL> getACL(CassandraId cassandraId) {
return getStoredACLRow(cassandraId)
.thenApply(resultSet -> getAcl(cassandraId, resultSet));
@@ -122,51 +121,42 @@ public class CassandraACLMapper {
}
public void updateACL(CassandraId cassandraId, MailboxACL.ACLCommand command) throws MailboxException {
+ MailboxACL replacement = MailboxACL.EMPTY.apply(command);
+
+ updateAcl(cassandraId, aclWithVersion -> aclWithVersion.apply(command), replacement);
+ }
+
+ public void setACL(CassandraId cassandraId, MailboxACL mailboxACL) throws MailboxException {
+ updateAcl(cassandraId,
+ acl -> new ACLWithVersion(acl.version, mailboxACL),
+ mailboxACL);
+ }
+
+ private void updateAcl(CassandraId cassandraId, Function<ACLWithVersion, ACLWithVersion> aclTransformation, MailboxACL replacement) throws MailboxException {
try {
new FunctionRunnerWithRetry(maxRetry)
.execute(
() -> {
codeInjector.inject();
- ResultSet resultSet = getAclWithVersion(cassandraId)
- .map(aclWithVersion -> aclWithVersion.apply(command))
+ return getAclWithVersion(cassandraId)
+ .map(aclTransformation)
.map(aclWithVersion -> updateStoredACL(cassandraId, aclWithVersion))
- .orElseGet(() -> insertACL(cassandraId, applyCommandOnEmptyACL(command)));
- return resultSet.one().getBool(CassandraConstants.LIGHTWEIGHT_TRANSACTION_APPLIED);
+ .orElseGet(() -> insertACL(cassandraId, replacement));
});
} catch (LightweightTransactionException e) {
throw new MailboxException("Exception during lightweight transaction", e);
}
}
- public void setACL(CassandraId cassandraId, MailboxACL mailboxACL) {
- try {
- executor.executeVoid(
- insertStatement.bind()
- .setUUID(CassandraACLTable.ID, cassandraId.asUuid())
- .setString(CassandraACLTable.ACL, MailboxACLJsonConverter.toJson(mailboxACL)))
- .join();
- } catch (JsonProcessingException e) {
- throw Throwables.propagate(e);
- }
- }
-
- private MailboxACL applyCommandOnEmptyACL(MailboxACL.ACLCommand command) {
- try {
- return MailboxACL.EMPTY.apply(command);
- } catch (UnsupportedRightException exception) {
- throw Throwables.propagate(exception);
- }
- }
-
private CompletableFuture<ResultSet> getStoredACLRow(CassandraId cassandraId) {
return executor.execute(
readStatement.bind()
.setUUID(CassandraACLTable.ID, cassandraId.asUuid()));
}
- private ResultSet updateStoredACL(CassandraId cassandraId, ACLWithVersion aclWithVersion) {
+ private boolean updateStoredACL(CassandraId cassandraId, ACLWithVersion aclWithVersion) {
try {
- return executor.execute(
+ return executor.executeReturnApplied(
conditionalUpdateStatement.bind()
.setUUID(CassandraACLTable.ID, cassandraId.asUuid())
.setString(CassandraACLTable.ACL, MailboxACLJsonConverter.toJson(aclWithVersion.mailboxACL))
@@ -178,9 +168,9 @@ public class CassandraACLMapper {
}
}
- private ResultSet insertACL(CassandraId cassandraId, MailboxACL acl) {
+ private boolean insertACL(CassandraId cassandraId, MailboxACL acl) {
try {
- return executor.execute(
+ return executor.executeReturnApplied(
conditionalInsertStatement.bind()
.setUUID(CassandraACLTable.ID, cassandraId.asUuid())
.setString(CassandraACLTable.ACL, MailboxACLJsonConverter.toJson(acl)))
@@ -196,7 +186,10 @@ public class CassandraACLMapper {
return Optional.empty();
}
Row row = resultSet.one();
- return Optional.of(new ACLWithVersion(row.getLong(CassandraACLTable.VERSION), deserializeACL(cassandraId, row.getString(CassandraACLTable.ACL))));
+ return Optional.of(
+ new ACLWithVersion(
+ row.getLong(CassandraACLTable.VERSION),
+ deserializeACL(cassandraId, row.getString(CassandraACLTable.ACL))));
}
private MailboxACL deserializeACL(CassandraId cassandraId, String serializedACL) {
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org