You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ma...@apache.org on 2017/09/29 07:21:56 UTC

[18/31] james-project git commit: MAILBOX-307 Prepare Cassandra ACL statements

MAILBOX-307 Prepare Cassandra ACL statements


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/91d303b1
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/91d303b1
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/91d303b1

Branch: refs/heads/master
Commit: 91d303b1dc72bd2a69a3b1ed8e7b3b86e7d85ca9
Parents: ee68d17
Author: benwa <bt...@linagora.com>
Authored: Mon Sep 25 17:17:47 2017 +0700
Committer: Matthieu Baechler <ma...@apache.org>
Committed: Fri Sep 29 09:20:40 2017 +0200

----------------------------------------------------------------------
 .../cassandra/mail/CassandraACLMapper.java      | 118 ++++++++++++-------
 1 file changed, 76 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/91d303b1/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
index 5695ffc..b2603a2 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.mailbox.cassandra.mail;
 
+import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
@@ -45,15 +46,18 @@ import org.apache.james.mailbox.store.json.SimpleMailboxACLJsonConverter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Row;
 import com.datastax.driver.core.Session;
+import com.datastax.driver.core.querybuilder.Insert;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.google.common.base.Throwables;
 
 public class CassandraACLMapper {
-    private static final Logger LOG = LoggerFactory.getLogger(CassandraACLMapper.class);
     public static final int INITIAL_VALUE = 0;
+    private static final Logger LOG = LoggerFactory.getLogger(CassandraACLMapper.class);
+    private static final String OLD_VERSION = "oldVersion";
 
     @FunctionalInterface
     public interface CodeInjector {
@@ -61,43 +65,75 @@ public class CassandraACLMapper {
     }
 
     private final CassandraAsyncExecutor executor;
-    private final Session session;
     private final int maxRetry;
     private final CodeInjector codeInjector;
+    private final PreparedStatement insertStatement;
+    private final PreparedStatement conditionalInsertStatement;
+    private final PreparedStatement conditionalUpdateStatement;
+    private final PreparedStatement readStatement;
 
     public CassandraACLMapper(Session session, CassandraConfiguration cassandraConfiguration) {
         this(session, cassandraConfiguration, () -> {});
     }
 
     public CassandraACLMapper(Session session, CassandraConfiguration cassandraConfiguration, CodeInjector codeInjector) {
-        this.session = session;
         this.executor = new CassandraAsyncExecutor(session);
         this.maxRetry = cassandraConfiguration.getAclMaxRetry();
         this.codeInjector = codeInjector;
+        this.insertStatement = session.prepare(insertCqlBase());
+        this.conditionalInsertStatement = session.prepare(insertCqlBase().ifNotExists());
+        this.conditionalUpdateStatement = prepareConditionalUpdate(session);
+        this.readStatement = prepareReadStatement(session);
+    }
+
+    private PreparedStatement prepareConditionalUpdate(Session session) {
+        return session.prepare(
+            update(CassandraACLTable.TABLE_NAME)
+                .where(eq(CassandraACLTable.ID, bindMarker(CassandraACLTable.ID)))
+                .with(set(CassandraACLTable.ACL, bindMarker(CassandraACLTable.ACL)))
+                .and(set(CassandraACLTable.VERSION, bindMarker(CassandraACLTable.VERSION)))
+                .onlyIf(eq(CassandraACLTable.VERSION, bindMarker(OLD_VERSION))));
+    }
+
+    private PreparedStatement prepareReadStatement(Session session) {
+        return session.prepare(
+            select(CassandraACLTable.ACL, CassandraACLTable.VERSION)
+                .from(CassandraACLTable.TABLE_NAME)
+                .where(eq(CassandraMailboxTable.ID, bindMarker(CassandraACLTable.ID))));
+    }
+
+    private Insert insertCqlBase() {
+        return insertInto(CassandraACLTable.TABLE_NAME)
+            .value(CassandraACLTable.ID, bindMarker(CassandraACLTable.ID))
+            .value(CassandraACLTable.ACL, bindMarker(CassandraACLTable.ACL))
+            .value(CassandraACLTable.VERSION, INITIAL_VALUE);
     }
 
     public CompletableFuture<MailboxACL> getACL(CassandraId cassandraId) {
-        return  getStoredACLRow(cassandraId).thenApply(resultSet -> {
-            if (resultSet.isExhausted()) {
-                return SimpleMailboxACL.EMPTY;
-            }
-            String serializedACL = resultSet.one().getString(CassandraACLTable.ACL);
-            return deserializeACL(cassandraId, serializedACL);
-        });
+        return getStoredACLRow(cassandraId)
+            .thenApply(resultSet -> getAcl(cassandraId, resultSet));
+    }
+
+    private MailboxACL getAcl(CassandraId cassandraId, ResultSet resultSet) {
+        if (resultSet.isExhausted()) {
+            return SimpleMailboxACL.EMPTY;
+        }
+        String serializedACL = resultSet.one().getString(CassandraACLTable.ACL);
+        return deserializeACL(cassandraId, serializedACL);
     }
 
     public void updateACL(CassandraId cassandraId, MailboxACL.MailboxACLCommand command) throws MailboxException {
         try {
-            new FunctionRunnerWithRetry(maxRetry).execute(
-                () -> {
-                    codeInjector.inject();
-                    ResultSet resultSet = getAclWithVersion(cassandraId)
-                        .map(aclWithVersion -> aclWithVersion.apply(command))
-                        .map(aclWithVersion -> updateStoredACL(cassandraId, aclWithVersion))
-                        .orElseGet(() -> insertACL(cassandraId, applyCommandOnEmptyACL(command)));
-                    return resultSet.one().getBool(CassandraConstants.LIGHTWEIGHT_TRANSACTION_APPLIED);
-                }
-            );
+            new FunctionRunnerWithRetry(maxRetry)
+                .execute(
+                    () -> {
+                        codeInjector.inject();
+                        ResultSet resultSet = getAclWithVersion(cassandraId)
+                            .map(aclWithVersion -> aclWithVersion.apply(command))
+                            .map(aclWithVersion -> updateStoredACL(cassandraId, aclWithVersion))
+                            .orElseGet(() -> insertACL(cassandraId, applyCommandOnEmptyACL(command)));
+                        return resultSet.one().getBool(CassandraConstants.LIGHTWEIGHT_TRANSACTION_APPLIED);
+                    });
         } catch (LightweightTransactionException e) {
             throw new MailboxException("Exception during lightweight transaction", e);
         }
@@ -105,11 +141,11 @@ public class CassandraACLMapper {
 
     public void resetACL(CassandraId cassandraId, MailboxACL mailboxACL) {
         try {
-            session.execute(
-                insertInto(CassandraACLTable.TABLE_NAME)
-                    .value(CassandraACLTable.ID, cassandraId.asUuid())
-                    .value(CassandraACLTable.ACL, SimpleMailboxACLJsonConverter.toJson(mailboxACL))
-                    .value(CassandraACLTable.VERSION, INITIAL_VALUE));
+            executor.executeVoid(
+                insertStatement.bind()
+                    .setUUID(CassandraACLTable.ID, cassandraId.asUuid())
+                    .setString(CassandraACLTable.ACL, SimpleMailboxACLJsonConverter.toJson(mailboxACL)))
+                .join();
         } catch (JsonProcessingException e) {
             throw Throwables.propagate(e);
         }
@@ -124,20 +160,20 @@ public class CassandraACLMapper {
     }
 
     private CompletableFuture<ResultSet> getStoredACLRow(CassandraId cassandraId) {
-        return executor.execute(select(CassandraACLTable.ACL, CassandraACLTable.VERSION)
-            .from(CassandraACLTable.TABLE_NAME)
-            .where(eq(CassandraMailboxTable.ID, cassandraId.asUuid())));
+        return executor.execute(
+            readStatement.bind()
+                .setUUID(CassandraACLTable.ID, cassandraId.asUuid()));
     }
 
     private ResultSet updateStoredACL(CassandraId cassandraId, ACLWithVersion aclWithVersion) {
         try {
-            return session.execute(
-                update(CassandraACLTable.TABLE_NAME)
-                    .with(set(CassandraACLTable.ACL, SimpleMailboxACLJsonConverter.toJson(aclWithVersion.mailboxACL)))
-                    .and(set(CassandraACLTable.VERSION, aclWithVersion.version + 1))
-                    .where(eq(CassandraACLTable.ID, cassandraId.asUuid()))
-                    .onlyIf(eq(CassandraACLTable.VERSION, aclWithVersion.version))
-            );
+            return executor.execute(
+                conditionalUpdateStatement.bind()
+                    .setUUID(CassandraACLTable.ID, cassandraId.asUuid())
+                    .setString(CassandraACLTable.ACL,  SimpleMailboxACLJsonConverter.toJson(aclWithVersion.mailboxACL))
+                    .setLong(CassandraACLTable.VERSION, aclWithVersion.version + 1)
+                    .setLong(OLD_VERSION, aclWithVersion.version))
+                .join();
         } catch (JsonProcessingException exception) {
             throw Throwables.propagate(exception);
         }
@@ -145,13 +181,11 @@ public class CassandraACLMapper {
 
     private ResultSet insertACL(CassandraId cassandraId, MailboxACL acl) {
         try {
-            return session.execute(
-                insertInto(CassandraACLTable.TABLE_NAME)
-                    .value(CassandraACLTable.ID, cassandraId.asUuid())
-                    .value(CassandraACLTable.ACL, SimpleMailboxACLJsonConverter.toJson(acl))
-                    .value(CassandraACLTable.VERSION, 0)
-                    .ifNotExists()
-            );
+            return executor.execute(
+                conditionalInsertStatement.bind()
+                    .setUUID(CassandraACLTable.ID, cassandraId.asUuid())
+                    .setString(CassandraACLTable.ACL, SimpleMailboxACLJsonConverter.toJson(acl)))
+                .join();
         } catch (JsonProcessingException exception) {
             throw Throwables.propagate(exception);
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org