You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ma...@apache.org on 2017/09/29 07:22:07 UTC

[29/31] james-project git commit: MAILBOX-307 Implement setAcl with a Cassandra read before writes

MAILBOX-307 Implement setAcl with a Cassandra read before writes


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/d5e1cf8f
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/d5e1cf8f
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/d5e1cf8f

Branch: refs/heads/master
Commit: d5e1cf8f39550d5cc7234a9d864414dd2d881c65
Parents: 998446b
Author: benwa <bt...@linagora.com>
Authored: Thu Sep 28 15:19:47 2017 +0700
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Fri Sep 29 09:20:41 2017 +0200

----------------------------------------------------------------------
 .../cassandra/mail/CassandraACLMapper.java      | 75 +++++++++-----------
 1 file changed, 34 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/d5e1cf8f/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
index a0dccf0..1e67d41 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
@@ -29,10 +29,10 @@ import static com.datastax.driver.core.querybuilder.QueryBuilder.update;
 import java.io.IOException;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
 
 import org.apache.james.backends.cassandra.init.CassandraConfiguration;
 import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
-import org.apache.james.backends.cassandra.utils.CassandraConstants;
 import org.apache.james.backends.cassandra.utils.FunctionRunnerWithRetry;
 import org.apache.james.backends.cassandra.utils.LightweightTransactionException;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
@@ -49,7 +49,6 @@ import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
-import com.datastax.driver.core.querybuilder.Insert;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.base.Throwables;
 
@@ -66,7 +65,6 @@ public class CassandraACLMapper {
     private final CassandraAsyncExecutor executor;
     private final int maxRetry;
     private final CodeInjector codeInjector;
-    private final PreparedStatement insertStatement;
     private final PreparedStatement conditionalInsertStatement;
     private final PreparedStatement conditionalUpdateStatement;
     private final PreparedStatement readStatement;
@@ -79,12 +77,20 @@ public class CassandraACLMapper {
         this.executor = new CassandraAsyncExecutor(session);
         this.maxRetry = cassandraConfiguration.getAclMaxRetry();
         this.codeInjector = codeInjector;
-        this.insertStatement = session.prepare(insertCqlBase());
-        this.conditionalInsertStatement = session.prepare(insertCqlBase().ifNotExists());
+        this.conditionalInsertStatement = prepareConditionalInsert(session);
         this.conditionalUpdateStatement = prepareConditionalUpdate(session);
         this.readStatement = prepareReadStatement(session);
     }
 
+    private PreparedStatement prepareConditionalInsert(Session session) {
+        return session.prepare(
+            insertInto(CassandraACLTable.TABLE_NAME)
+                .value(CassandraACLTable.ID, bindMarker(CassandraACLTable.ID))
+                .value(CassandraACLTable.ACL, bindMarker(CassandraACLTable.ACL))
+                .value(CassandraACLTable.VERSION, INITIAL_VALUE)
+                .ifNotExists());
+    }
+
     private PreparedStatement prepareConditionalUpdate(Session session) {
         return session.prepare(
             update(CassandraACLTable.TABLE_NAME)
@@ -101,13 +107,6 @@ public class CassandraACLMapper {
                 .where(eq(CassandraMailboxTable.ID, bindMarker(CassandraACLTable.ID))));
     }
 
-    private Insert insertCqlBase() {
-        return insertInto(CassandraACLTable.TABLE_NAME)
-            .value(CassandraACLTable.ID, bindMarker(CassandraACLTable.ID))
-            .value(CassandraACLTable.ACL, bindMarker(CassandraACLTable.ACL))
-            .value(CassandraACLTable.VERSION, INITIAL_VALUE);
-    }
-
     public CompletableFuture<MailboxACL> getACL(CassandraId cassandraId) {
         return getStoredACLRow(cassandraId)
             .thenApply(resultSet -> getAcl(cassandraId, resultSet));
@@ -122,51 +121,42 @@ public class CassandraACLMapper {
     }
 
     public void updateACL(CassandraId cassandraId, MailboxACL.ACLCommand command) throws MailboxException {
+        MailboxACL replacement = MailboxACL.EMPTY.apply(command);
+
+        updateAcl(cassandraId, aclWithVersion -> aclWithVersion.apply(command), replacement);
+    }
+
+    public void setACL(CassandraId cassandraId, MailboxACL mailboxACL) throws MailboxException {
+        updateAcl(cassandraId,
+            acl -> new ACLWithVersion(acl.version, mailboxACL),
+            mailboxACL);
+    }
+
+    private void updateAcl(CassandraId cassandraId, Function<ACLWithVersion, ACLWithVersion> aclTransformation, MailboxACL replacement) throws MailboxException {
         try {
             new FunctionRunnerWithRetry(maxRetry)
                 .execute(
                     () -> {
                         codeInjector.inject();
-                        ResultSet resultSet = getAclWithVersion(cassandraId)
-                            .map(aclWithVersion -> aclWithVersion.apply(command))
+                        return getAclWithVersion(cassandraId)
+                            .map(aclTransformation)
                             .map(aclWithVersion -> updateStoredACL(cassandraId, aclWithVersion))
-                            .orElseGet(() -> insertACL(cassandraId, applyCommandOnEmptyACL(command)));
-                        return resultSet.one().getBool(CassandraConstants.LIGHTWEIGHT_TRANSACTION_APPLIED);
+                            .orElseGet(() -> insertACL(cassandraId, replacement));
                     });
         } catch (LightweightTransactionException e) {
             throw new MailboxException("Exception during lightweight transaction", e);
         }
     }
 
-    public void setACL(CassandraId cassandraId, MailboxACL mailboxACL) {
-        try {
-            executor.executeVoid(
-                insertStatement.bind()
-                    .setUUID(CassandraACLTable.ID, cassandraId.asUuid())
-                    .setString(CassandraACLTable.ACL, MailboxACLJsonConverter.toJson(mailboxACL)))
-                .join();
-        } catch (JsonProcessingException e) {
-            throw Throwables.propagate(e);
-        }
-    }
-
-    private MailboxACL applyCommandOnEmptyACL(MailboxACL.ACLCommand command) {
-        try {
-            return MailboxACL.EMPTY.apply(command);
-        } catch (UnsupportedRightException exception) {
-            throw Throwables.propagate(exception);
-        }
-    }
-
     private CompletableFuture<ResultSet> getStoredACLRow(CassandraId cassandraId) {
         return executor.execute(
             readStatement.bind()
                 .setUUID(CassandraACLTable.ID, cassandraId.asUuid()));
     }
 
-    private ResultSet updateStoredACL(CassandraId cassandraId, ACLWithVersion aclWithVersion) {
+    private boolean updateStoredACL(CassandraId cassandraId, ACLWithVersion aclWithVersion) {
         try {
-            return executor.execute(
+            return executor.executeReturnApplied(
                 conditionalUpdateStatement.bind()
                     .setUUID(CassandraACLTable.ID, cassandraId.asUuid())
                     .setString(CassandraACLTable.ACL,  MailboxACLJsonConverter.toJson(aclWithVersion.mailboxACL))
@@ -178,9 +168,9 @@ public class CassandraACLMapper {
         }
     }
 
-    private ResultSet insertACL(CassandraId cassandraId, MailboxACL acl) {
+    private boolean insertACL(CassandraId cassandraId, MailboxACL acl) {
         try {
-            return executor.execute(
+            return executor.executeReturnApplied(
                 conditionalInsertStatement.bind()
                     .setUUID(CassandraACLTable.ID, cassandraId.asUuid())
                     .setString(CassandraACLTable.ACL, MailboxACLJsonConverter.toJson(acl)))
@@ -196,7 +186,10 @@ public class CassandraACLMapper {
             return Optional.empty();
         }
         Row row = resultSet.one();
-        return Optional.of(new ACLWithVersion(row.getLong(CassandraACLTable.VERSION), deserializeACL(cassandraId, row.getString(CassandraACLTable.ACL))));
+        return Optional.of(
+            new ACLWithVersion(
+                row.getLong(CassandraACLTable.VERSION),
+                deserializeACL(cassandraId, row.getString(CassandraACLTable.ACL))));
     }
 
     private MailboxACL deserializeACL(CassandraId cassandraId, String serializedACL) {


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org