You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/02/25 02:25:40 UTC

[james-project] 03/10: JAMES-3058 Concurrency test for CassandraAclMapper

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 6e16c097c1aa77374b056342036c8f2357f3e509
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Thu Feb 20 16:52:00 2020 +0700

    JAMES-3058 Concurrency test for CassandraAclMapper
    
    This can be done without code injection in business logic thanks to
    Cassandra statement instrumentation.
---
 .../james/backends/cassandra/TestingSession.java   | 14 ++++++--
 .../mailbox/cassandra/mail/CassandraACLMapper.java | 16 ++--------
 .../cassandra/mail/CassandraACLMapperTest.java     | 37 +++++++++++++++++-----
 3 files changed, 42 insertions(+), 25 deletions(-)

diff --git a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSession.java b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSession.java
index 2e879c8..d102148 100644
--- a/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSession.java
+++ b/backends-common/cassandra/src/test/java/org/apache/james/backends/cassandra/TestingSession.java
@@ -58,13 +58,21 @@ public class TestingSession implements Session {
 
     public static class Barrier {
         private final CountDownLatch callerLatch = new CountDownLatch(1);
-        private final CountDownLatch awaitCallerLatch = new CountDownLatch(1);
+        private final CountDownLatch awaitCallerLatch;
 
-        void awaitCaller() throws InterruptedException {
+        public Barrier() {
+            this(1);
+        }
+
+        public Barrier(int callerCount) {
+            awaitCallerLatch = new CountDownLatch(callerCount);
+        }
+
+        public void awaitCaller() throws InterruptedException {
             awaitCallerLatch.await();
         }
 
-        void releaseCaller() {
+        public void releaseCaller() {
             callerLatch.countDown();
         }
 
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
index c877d9a..4a2734b 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapper.java
@@ -58,14 +58,8 @@ public class CassandraACLMapper {
     private static final Logger LOG = LoggerFactory.getLogger(CassandraACLMapper.class);
     private static final String OLD_VERSION = "oldVersion";
 
-    @FunctionalInterface
-    public interface CodeInjector {
-        void inject();
-    }
-
     private final CassandraAsyncExecutor executor;
     private final int maxAclRetry;
-    private final CodeInjector codeInjector;
     private final CassandraUserMailboxRightsDAO userMailboxRightsDAO;
     private final PreparedStatement conditionalInsertStatement;
     private final PreparedStatement conditionalUpdateStatement;
@@ -73,13 +67,8 @@ public class CassandraACLMapper {
 
     @Inject
     public CassandraACLMapper(Session session, CassandraUserMailboxRightsDAO userMailboxRightsDAO, CassandraConfiguration cassandraConfiguration) {
-        this(session, userMailboxRightsDAO, cassandraConfiguration, () -> { });
-    }
-
-    public CassandraACLMapper(Session session, CassandraUserMailboxRightsDAO userMailboxRightsDAO, CassandraConfiguration cassandraConfiguration, CodeInjector codeInjector) {
         this.executor = new CassandraAsyncExecutor(session);
         this.maxAclRetry = cassandraConfiguration.getAclMaxRetry();
-        this.codeInjector = codeInjector;
         this.conditionalInsertStatement = prepareConditionalInsert(session);
         this.conditionalUpdateStatement = prepareConditionalUpdate(session);
         this.readStatement = prepareReadStatement(session);
@@ -139,9 +128,8 @@ public class CassandraACLMapper {
             .orElseThrow(() -> new MailboxException("Unable to update ACL"));
     }
 
-    private Mono<ACLDiff> updateAcl(CassandraId cassandraId, Function<ACLWithVersion, ACLWithVersion> aclTransformation, MailboxACL replacement) throws MailboxException {
-        return Mono.fromRunnable(() -> codeInjector.inject())
-            .then(Mono.defer(() -> getAclWithVersion(cassandraId)))
+    private Mono<ACLDiff> updateAcl(CassandraId cassandraId, Function<ACLWithVersion, ACLWithVersion> aclTransformation, MailboxACL replacement) {
+        return getAclWithVersion(cassandraId)
             .flatMap(aclWithVersion ->
                     updateStoredACL(cassandraId, aclTransformation.apply(aclWithVersion))
                             .map(newACL -> ACLDiff.computeDiff(aclWithVersion.mailboxACL, newACL)))
diff --git a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapperTest.java b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapperTest.java
index d112645..025a41a 100644
--- a/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapperTest.java
+++ b/mailbox/cassandra/src/test/java/org/apache/james/mailbox/cassandra/mail/CassandraACLMapperTest.java
@@ -33,6 +33,7 @@ import java.util.concurrent.TimeoutException;
 
 import org.apache.james.backends.cassandra.CassandraCluster;
 import org.apache.james.backends.cassandra.CassandraClusterExtension;
+import org.apache.james.backends.cassandra.TestingSession.Barrier;
 import org.apache.james.backends.cassandra.init.configuration.CassandraConfiguration;
 import org.apache.james.backends.cassandra.utils.CassandraUtils;
 import org.apache.james.mailbox.cassandra.ids.CassandraId;
@@ -160,12 +161,22 @@ class CassandraACLMapperTest {
 
     @Test
     void twoConcurrentUpdatesWhenNoACLStoredShouldReturnACLWithTwoEntries(CassandraCluster cassandra) throws Exception {
-        CountDownLatch countDownLatch = new CountDownLatch(2);
+        Barrier barrier = new Barrier(2);
+        cassandra.getConf()
+            .awaitOn(barrier)
+            .whenBoundStatementStartsWith("SELECT acl,version FROM acl WHERE id=:id;")
+            .times(2)
+            .setExecutionHook();
+
         MailboxACL.EntryKey keyBob = new MailboxACL.EntryKey("bob", MailboxACL.NameType.user, false);
         MailboxACL.Rfc4314Rights rights = new MailboxACL.Rfc4314Rights(MailboxACL.Right.Read);
         MailboxACL.EntryKey keyAlice = new MailboxACL.EntryKey("alice", MailboxACL.NameType.user, false);
-        Future<Boolean> future1 = performACLUpdateInExecutor(cassandra, executor, keyBob, rights, countDownLatch::countDown);
-        Future<Boolean> future2 = performACLUpdateInExecutor(cassandra, executor, keyAlice, rights, countDownLatch::countDown);
+        Future<Boolean> future1 = performACLUpdateInExecutor(cassandra, executor, keyBob, rights);
+        Future<Boolean> future2 = performACLUpdateInExecutor(cassandra, executor, keyAlice, rights);
+
+        barrier.awaitCaller();
+        barrier.releaseCaller();
+
         awaitAll(future1, future2);
 
         assertThat(cassandraACLMapper.getACL(MAILBOX_ID).block())
@@ -179,10 +190,21 @@ class CassandraACLMapperTest {
         MailboxACL.Rfc4314Rights rights = new MailboxACL.Rfc4314Rights(MailboxACL.Right.Read);
         cassandraACLMapper.updateACL(MAILBOX_ID, MailboxACL.command().key(keyBenwa).rights(rights).asAddition());
 
+        Barrier barrier = new Barrier(2);
+        cassandra.getConf()
+            .awaitOn(barrier)
+            .whenBoundStatementStartsWith("SELECT acl,version FROM acl WHERE id=:id;")
+            .times(2)
+            .setExecutionHook();
+
         MailboxACL.EntryKey keyBob = new MailboxACL.EntryKey("bob", MailboxACL.NameType.user, false);
         MailboxACL.EntryKey keyAlice = new MailboxACL.EntryKey("alice", MailboxACL.NameType.user, false);
-        Future<Boolean> future1 = performACLUpdateInExecutor(cassandra, executor, keyBob, rights, countDownLatch::countDown);
-        Future<Boolean> future2 = performACLUpdateInExecutor(cassandra, executor, keyAlice, rights, countDownLatch::countDown);
+        Future<Boolean> future1 = performACLUpdateInExecutor(cassandra, executor, keyBob, rights);
+        Future<Boolean> future2 = performACLUpdateInExecutor(cassandra, executor, keyAlice, rights);
+
+        barrier.awaitCaller();
+        barrier.releaseCaller();
+
         awaitAll(future1, future2);
 
         assertThat(cassandraACLMapper.getACL(MAILBOX_ID).block())
@@ -196,13 +218,12 @@ class CassandraACLMapperTest {
         }
     }
 
-    private Future<Boolean> performACLUpdateInExecutor(CassandraCluster cassandra, ExecutorService executor, MailboxACL.EntryKey key, MailboxACL.Rfc4314Rights rights, CassandraACLMapper.CodeInjector runnable) {
+    private Future<Boolean> performACLUpdateInExecutor(CassandraCluster cassandra, ExecutorService executor, MailboxACL.EntryKey key, MailboxACL.Rfc4314Rights rights) {
         return executor.submit(() -> {
             CassandraACLMapper aclMapper = new CassandraACLMapper(
                 cassandra.getConf(),
                 new CassandraUserMailboxRightsDAO(cassandra.getConf(), CassandraUtils.WITH_DEFAULT_CONFIGURATION),
-                CassandraConfiguration.DEFAULT_CONFIGURATION,
-                runnable);
+                CassandraConfiguration.DEFAULT_CONFIGURATION);
             try {
                 aclMapper.updateACL(MAILBOX_ID, MailboxACL.command().key(key).rights(rights).asAddition());
             } catch (MailboxException exception) {


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org