You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2017/02/23 03:39:02 UTC

[2/4] james-project git commit: JAMES-1945 Handle modSeq computation as future

JAMES-1945 Handle modSeq computation as future

Prepare statements for Cassandra ModSeqs


Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/f2b7cd70
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/f2b7cd70
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/f2b7cd70

Branch: refs/heads/master
Commit: f2b7cd70787072ceccc13442951118272c56cc85
Parents: 9200405
Author: Benoit Tellier <bt...@linagora.com>
Authored: Mon Feb 20 10:47:03 2017 +0700
Committer: benwa <bt...@linagora.com>
Committed: Thu Feb 23 10:37:41 2017 +0700

----------------------------------------------------------------------
 .../cassandra/mail/CassandraModSeqProvider.java | 173 ++++++++++++-------
 1 file changed, 110 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/james-project/blob/f2b7cd70/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
index af4ca5d..1a6a885 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.mailbox.cassandra.mail;
 
+import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
@@ -29,38 +30,87 @@ import static org.apache.james.mailbox.cassandra.table.CassandraMessageModseqTab
 import static org.apache.james.mailbox.cassandra.table.CassandraMessageModseqTable.TABLE_NAME;
 
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 
 import javax.inject.Inject;
 
-import org.apache.james.backends.cassandra.utils.CassandraConstants;
+import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
 import org.apache.james.backends.cassandra.utils.FunctionRunnerWithRetry;
-import org.apache.james.backends.cassandra.utils.LightweightTransactionException;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.cassandra.CassandraId;
 import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.model.MailboxId;
 import org.apache.james.mailbox.store.mail.ModSeqProvider;
 import org.apache.james.mailbox.store.mail.model.Mailbox;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.PreparedStatement;
 import com.datastax.driver.core.Session;
-import com.datastax.driver.core.querybuilder.BuiltStatement;
-import com.google.common.base.Throwables;
+import com.google.common.base.Supplier;
 
 public class CassandraModSeqProvider implements ModSeqProvider {
 
+    public static final String MOD_SEQ_CONDITION = "modSeqCondition";
+
+    public static class ExceptionRelay extends RuntimeException {
+        private final MailboxException underlying;
+
+        public ExceptionRelay(MailboxException underlying) {
+            super(underlying);
+            this.underlying = underlying;
+        }
+
+        public MailboxException getUnderlying() {
+            return underlying;
+        }
+    }
+
+    private static <T> T unbox(Supplier<T> supplier) throws MailboxException {
+        try {
+            return supplier.get();
+        } catch (CompletionException e) {
+            if (e.getCause() instanceof ExceptionRelay) {
+                throw ((ExceptionRelay) e.getCause()).getUnderlying();
+            }
+            throw e;
+        }
+    }
+
     private static final int DEFAULT_MAX_RETRY = 100000;
-    private static final Logger LOG = LoggerFactory.getLogger(CassandraModSeqProvider.class);
     private static final ModSeq FIRST_MODSEQ = new ModSeq(0);
-    
-    private final Session session;
+
+    private final CassandraAsyncExecutor cassandraAsyncExecutor;
     private final FunctionRunnerWithRetry runner;
+    private final PreparedStatement select;
+    private final PreparedStatement update;
+    private final PreparedStatement insert;
 
     public CassandraModSeqProvider(Session session, int maxRetry) {
-        this.session = session;
+        this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
         this.runner = new FunctionRunnerWithRetry(maxRetry);
+        this.insert = prepareInsert(session);
+        this.update = prepareUpdate(session);
+        this.select = prepareSelect(session);
+    }
+
+    private PreparedStatement prepareInsert(Session session) {
+        return session.prepare(insertInto(TABLE_NAME)
+            .value(NEXT_MODSEQ, bindMarker(NEXT_MODSEQ))
+            .value(MAILBOX_ID, bindMarker(MAILBOX_ID))
+            .ifNotExists());
+    }
+
+    private PreparedStatement prepareUpdate(Session session) {
+        return session.prepare(update(TABLE_NAME)
+            .onlyIf(eq(NEXT_MODSEQ, bindMarker(MOD_SEQ_CONDITION)))
+            .with(set(NEXT_MODSEQ, bindMarker(NEXT_MODSEQ)))
+            .where(eq(MAILBOX_ID, bindMarker(MAILBOX_ID))));
+    }
+
+    private PreparedStatement prepareSelect(Session session) {
+        return session.prepare(select(NEXT_MODSEQ)
+            .from(TABLE_NAME)
+            .where(eq(MAILBOX_ID, bindMarker(MAILBOX_ID))));
     }
 
     @Inject
@@ -71,87 +121,84 @@ public class CassandraModSeqProvider implements ModSeqProvider {
     @Override
     public long nextModSeq(MailboxSession mailboxSession, Mailbox mailbox) throws MailboxException {
         CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
-        return nextModSeq(mailboxId);
+        return nextModSeq(mailboxId).join()
+            .orElseThrow(() -> new MailboxException("Can not retrieve modseq for " + mailboxId));
     }
 
     @Override
     public long nextModSeq(MailboxSession session, MailboxId mailboxId) throws MailboxException {
-        return nextModSeq((CassandraId)mailboxId);
+        return nextModSeq((CassandraId) mailboxId)
+            .join()
+            .orElseThrow(() -> new MailboxException("Can not retrieve modseq for " + mailboxId));
     }
 
     @Override
     public long highestModSeq(MailboxSession mailboxSession, Mailbox mailbox) throws MailboxException {
-        return findHighestModSeq((CassandraId) mailbox.getMailboxId()).getValue();
+        return unbox(() -> findHighestModSeq((CassandraId) mailbox.getMailboxId()).join().getValue());
     }
 
     @Override
     public long highestModSeq(MailboxSession mailboxSession, MailboxId mailboxId) throws MailboxException {
-        return findHighestModSeq((CassandraId) mailboxId).getValue();
-    }
-
-    private ModSeq findHighestModSeq(CassandraId mailboxId) throws MailboxException {
-        ResultSet result = session.execute(
-            select(NEXT_MODSEQ)
-                .from(TABLE_NAME)
-                .where(eq(MAILBOX_ID, mailboxId.asUuid())));
-        if (result.isExhausted()) {
-            return FIRST_MODSEQ;
-        } else {
-            return new ModSeq(result.one().getLong(NEXT_MODSEQ));
-        }
+        return unbox(() -> findHighestModSeq((CassandraId) mailboxId).join().getValue());
+    }
+
+    private CompletableFuture<ModSeq> findHighestModSeq(CassandraId mailboxId) {
+        return cassandraAsyncExecutor.executeSingleRow(
+            select.bind()
+                .setUUID(MAILBOX_ID, mailboxId.asUuid()))
+            .thenApply(optional -> optional.map(row -> new ModSeq(row.getLong(NEXT_MODSEQ)))
+                .orElse(FIRST_MODSEQ));
     }
 
-    private Optional<ModSeq> tryInsertModSeq(CassandraId mailboxId, ModSeq modSeq) {
+    private CompletableFuture<Optional<ModSeq>> tryInsertModSeq(CassandraId mailboxId, ModSeq modSeq) {
         ModSeq nextModSeq = modSeq.next();
-        return transactionalStatementToOptionalModSeq(nextModSeq,
-            insertInto(TABLE_NAME)
-                .value(NEXT_MODSEQ, nextModSeq.getValue())
-                .value(MAILBOX_ID, mailboxId.asUuid())
-                .ifNotExists());
+        return cassandraAsyncExecutor.executeReturnApplied(
+            insert.bind()
+                .setUUID(MAILBOX_ID, mailboxId.asUuid())
+                .setLong(NEXT_MODSEQ, nextModSeq.getValue()))
+            .thenApply(success -> successToModSeq(nextModSeq, success));
     }
 
-    private Optional<ModSeq> tryUpdateModSeq(CassandraId mailboxId, ModSeq modSeq) {
+    private CompletableFuture<Optional<ModSeq>> tryUpdateModSeq(CassandraId mailboxId, ModSeq modSeq) {
         ModSeq nextModSeq = modSeq.next();
-        return transactionalStatementToOptionalModSeq(nextModSeq,
-            update(TABLE_NAME)
-                .onlyIf(eq(NEXT_MODSEQ, modSeq.getValue()))
-                .with(set(NEXT_MODSEQ, nextModSeq.getValue()))
-                .where(eq(MAILBOX_ID, mailboxId.asUuid())));
+        return cassandraAsyncExecutor.executeReturnApplied(
+            update.bind()
+                .setUUID(MAILBOX_ID, mailboxId.asUuid())
+                .setLong(NEXT_MODSEQ, nextModSeq.getValue())
+                .setLong(MOD_SEQ_CONDITION, modSeq.getValue()))
+            .thenApply(success -> successToModSeq(nextModSeq, success));
     }
 
-    private Optional<ModSeq> transactionalStatementToOptionalModSeq(ModSeq modSeq, BuiltStatement statement) {
-        if(session.execute(statement).one().getBool(CassandraConstants.LIGHTWEIGHT_TRANSACTION_APPLIED)) {
+    private Optional<ModSeq> successToModSeq(ModSeq modSeq, Boolean success) {
+        if (success) {
             return Optional.of(modSeq);
         }
         return Optional.empty();
     }
     
-    private long nextModSeq(CassandraId mailboxId) throws MailboxException {
-        if (findHighestModSeq(mailboxId).isFirst()) {
-            Optional<ModSeq> optional = tryInsertModSeq(mailboxId, FIRST_MODSEQ);
-            if (optional.isPresent()) {
-                return optional.get().getValue();
-            }
-        }
-
-        try {
-            return runner.executeAndRetrieveObject(
-                () -> {
-                    try {
-                        return tryUpdateModSeq(mailboxId, findHighestModSeq(mailboxId))
-                            .map(ModSeq::getValue);
-                    } catch (Exception exception) {
-                        LOG.error("Can not retrieve next ModSeq", exception);
-                        throw Throwables.propagate(exception);
+    public CompletableFuture<Optional<Long>> nextModSeq(CassandraId mailboxId) {
+        return findHighestModSeq(mailboxId)
+            .thenCompose(modSeq -> {
+                if (modSeq.isFirst()) {
+                    return tryInsertModSeq(mailboxId, FIRST_MODSEQ);
+                }
+                return tryUpdateModSeq(mailboxId, modSeq);
+            }).thenCompose(firstInsert -> {
+                    if (firstInsert.isPresent()) {
+                        return CompletableFuture.completedFuture(firstInsert);
                     }
-                });
-        } catch (LightweightTransactionException e) {
-            throw new MailboxException("Error during ModSeq update", e);
-        }
+                    return handleRetries(mailboxId);
+                })
+            .thenApply(optional -> optional.map(ModSeq::getValue));
+    }
+
+    private CompletableFuture<Optional<ModSeq>> handleRetries(CassandraId mailboxId) {
+        return runner.executeAsyncAndRetrieveObject(
+            () -> findHighestModSeq(mailboxId)
+                .thenCompose(newModSeq -> tryUpdateModSeq(mailboxId, newModSeq)));
     }
 
     private static class ModSeq {
-        
         private final long value;
         
         public ModSeq(long value) {


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org