You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2017/02/23 03:39:02 UTC
[2/4] james-project git commit: JAMES-1945 Handle modSeq computation
as future
JAMES-1945 Handle modSeq computation as future
Prepare statements for Cassandra ModSeqs
Project: http://git-wip-us.apache.org/repos/asf/james-project/repo
Commit: http://git-wip-us.apache.org/repos/asf/james-project/commit/f2b7cd70
Tree: http://git-wip-us.apache.org/repos/asf/james-project/tree/f2b7cd70
Diff: http://git-wip-us.apache.org/repos/asf/james-project/diff/f2b7cd70
Branch: refs/heads/master
Commit: f2b7cd70787072ceccc13442951118272c56cc85
Parents: 9200405
Author: Benoit Tellier <bt...@linagora.com>
Authored: Mon Feb 20 10:47:03 2017 +0700
Committer: benwa <bt...@linagora.com>
Committed: Thu Feb 23 10:37:41 2017 +0700
----------------------------------------------------------------------
.../cassandra/mail/CassandraModSeqProvider.java | 173 ++++++++++++-------
1 file changed, 110 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/james-project/blob/f2b7cd70/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
----------------------------------------------------------------------
diff --git a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
index af4ca5d..1a6a885 100644
--- a/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
+++ b/mailbox/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraModSeqProvider.java
@@ -19,6 +19,7 @@
package org.apache.james.mailbox.cassandra.mail;
+import static com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker;
import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
@@ -29,38 +30,87 @@ import static org.apache.james.mailbox.cassandra.table.CassandraMessageModseqTab
import static org.apache.james.mailbox.cassandra.table.CassandraMessageModseqTable.TABLE_NAME;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import javax.inject.Inject;
-import org.apache.james.backends.cassandra.utils.CassandraConstants;
+import org.apache.james.backends.cassandra.utils.CassandraAsyncExecutor;
import org.apache.james.backends.cassandra.utils.FunctionRunnerWithRetry;
-import org.apache.james.backends.cassandra.utils.LightweightTransactionException;
import org.apache.james.mailbox.MailboxSession;
import org.apache.james.mailbox.cassandra.CassandraId;
import org.apache.james.mailbox.exception.MailboxException;
import org.apache.james.mailbox.model.MailboxId;
import org.apache.james.mailbox.store.mail.ModSeqProvider;
import org.apache.james.mailbox.store.mail.model.Mailbox;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
-import com.datastax.driver.core.querybuilder.BuiltStatement;
-import com.google.common.base.Throwables;
+import com.google.common.base.Supplier;
public class CassandraModSeqProvider implements ModSeqProvider {
+ public static final String MOD_SEQ_CONDITION = "modSeqCondition";
+
+ public static class ExceptionRelay extends RuntimeException {
+ private final MailboxException underlying;
+
+ public ExceptionRelay(MailboxException underlying) {
+ super(underlying);
+ this.underlying = underlying;
+ }
+
+ public MailboxException getUnderlying() {
+ return underlying;
+ }
+ }
+
+ private static <T> T unbox(Supplier<T> supplier) throws MailboxException {
+ try {
+ return supplier.get();
+ } catch (CompletionException e) {
+ if (e.getCause() instanceof ExceptionRelay) {
+ throw ((ExceptionRelay) e.getCause()).getUnderlying();
+ }
+ throw e;
+ }
+ }
+
private static final int DEFAULT_MAX_RETRY = 100000;
- private static final Logger LOG = LoggerFactory.getLogger(CassandraModSeqProvider.class);
private static final ModSeq FIRST_MODSEQ = new ModSeq(0);
-
- private final Session session;
+
+ private final CassandraAsyncExecutor cassandraAsyncExecutor;
private final FunctionRunnerWithRetry runner;
+ private final PreparedStatement select;
+ private final PreparedStatement update;
+ private final PreparedStatement insert;
public CassandraModSeqProvider(Session session, int maxRetry) {
- this.session = session;
+ this.cassandraAsyncExecutor = new CassandraAsyncExecutor(session);
this.runner = new FunctionRunnerWithRetry(maxRetry);
+ this.insert = prepareInsert(session);
+ this.update = prepareUpdate(session);
+ this.select = prepareSelect(session);
+ }
+
+ private PreparedStatement prepareInsert(Session session) {
+ return session.prepare(insertInto(TABLE_NAME)
+ .value(NEXT_MODSEQ, bindMarker(NEXT_MODSEQ))
+ .value(MAILBOX_ID, bindMarker(MAILBOX_ID))
+ .ifNotExists());
+ }
+
+ private PreparedStatement prepareUpdate(Session session) {
+ return session.prepare(update(TABLE_NAME)
+ .onlyIf(eq(NEXT_MODSEQ, bindMarker(MOD_SEQ_CONDITION)))
+ .with(set(NEXT_MODSEQ, bindMarker(NEXT_MODSEQ)))
+ .where(eq(MAILBOX_ID, bindMarker(MAILBOX_ID))));
+ }
+
+ private PreparedStatement prepareSelect(Session session) {
+ return session.prepare(select(NEXT_MODSEQ)
+ .from(TABLE_NAME)
+ .where(eq(MAILBOX_ID, bindMarker(MAILBOX_ID))));
}
@Inject
@@ -71,87 +121,84 @@ public class CassandraModSeqProvider implements ModSeqProvider {
@Override
public long nextModSeq(MailboxSession mailboxSession, Mailbox mailbox) throws MailboxException {
CassandraId mailboxId = (CassandraId) mailbox.getMailboxId();
- return nextModSeq(mailboxId);
+ return nextModSeq(mailboxId).join()
+ .orElseThrow(() -> new MailboxException("Can not retrieve modseq for " + mailboxId));
}
@Override
public long nextModSeq(MailboxSession session, MailboxId mailboxId) throws MailboxException {
- return nextModSeq((CassandraId)mailboxId);
+ return nextModSeq((CassandraId) mailboxId)
+ .join()
+ .orElseThrow(() -> new MailboxException("Can not retrieve modseq for " + mailboxId));
}
@Override
public long highestModSeq(MailboxSession mailboxSession, Mailbox mailbox) throws MailboxException {
- return findHighestModSeq((CassandraId) mailbox.getMailboxId()).getValue();
+ return unbox(() -> findHighestModSeq((CassandraId) mailbox.getMailboxId()).join().getValue());
}
@Override
public long highestModSeq(MailboxSession mailboxSession, MailboxId mailboxId) throws MailboxException {
- return findHighestModSeq((CassandraId) mailboxId).getValue();
- }
-
- private ModSeq findHighestModSeq(CassandraId mailboxId) throws MailboxException {
- ResultSet result = session.execute(
- select(NEXT_MODSEQ)
- .from(TABLE_NAME)
- .where(eq(MAILBOX_ID, mailboxId.asUuid())));
- if (result.isExhausted()) {
- return FIRST_MODSEQ;
- } else {
- return new ModSeq(result.one().getLong(NEXT_MODSEQ));
- }
+ return unbox(() -> findHighestModSeq((CassandraId) mailboxId).join().getValue());
+ }
+
+ private CompletableFuture<ModSeq> findHighestModSeq(CassandraId mailboxId) {
+ return cassandraAsyncExecutor.executeSingleRow(
+ select.bind()
+ .setUUID(MAILBOX_ID, mailboxId.asUuid()))
+ .thenApply(optional -> optional.map(row -> new ModSeq(row.getLong(NEXT_MODSEQ)))
+ .orElse(FIRST_MODSEQ));
}
- private Optional<ModSeq> tryInsertModSeq(CassandraId mailboxId, ModSeq modSeq) {
+ private CompletableFuture<Optional<ModSeq>> tryInsertModSeq(CassandraId mailboxId, ModSeq modSeq) {
ModSeq nextModSeq = modSeq.next();
- return transactionalStatementToOptionalModSeq(nextModSeq,
- insertInto(TABLE_NAME)
- .value(NEXT_MODSEQ, nextModSeq.getValue())
- .value(MAILBOX_ID, mailboxId.asUuid())
- .ifNotExists());
+ return cassandraAsyncExecutor.executeReturnApplied(
+ insert.bind()
+ .setUUID(MAILBOX_ID, mailboxId.asUuid())
+ .setLong(NEXT_MODSEQ, nextModSeq.getValue()))
+ .thenApply(success -> successToModSeq(nextModSeq, success));
}
- private Optional<ModSeq> tryUpdateModSeq(CassandraId mailboxId, ModSeq modSeq) {
+ private CompletableFuture<Optional<ModSeq>> tryUpdateModSeq(CassandraId mailboxId, ModSeq modSeq) {
ModSeq nextModSeq = modSeq.next();
- return transactionalStatementToOptionalModSeq(nextModSeq,
- update(TABLE_NAME)
- .onlyIf(eq(NEXT_MODSEQ, modSeq.getValue()))
- .with(set(NEXT_MODSEQ, nextModSeq.getValue()))
- .where(eq(MAILBOX_ID, mailboxId.asUuid())));
+ return cassandraAsyncExecutor.executeReturnApplied(
+ update.bind()
+ .setUUID(MAILBOX_ID, mailboxId.asUuid())
+ .setLong(NEXT_MODSEQ, nextModSeq.getValue())
+ .setLong(MOD_SEQ_CONDITION, modSeq.getValue()))
+ .thenApply(success -> successToModSeq(nextModSeq, success));
}
- private Optional<ModSeq> transactionalStatementToOptionalModSeq(ModSeq modSeq, BuiltStatement statement) {
- if(session.execute(statement).one().getBool(CassandraConstants.LIGHTWEIGHT_TRANSACTION_APPLIED)) {
+ private Optional<ModSeq> successToModSeq(ModSeq modSeq, Boolean success) {
+ if (success) {
return Optional.of(modSeq);
}
return Optional.empty();
}
- private long nextModSeq(CassandraId mailboxId) throws MailboxException {
- if (findHighestModSeq(mailboxId).isFirst()) {
- Optional<ModSeq> optional = tryInsertModSeq(mailboxId, FIRST_MODSEQ);
- if (optional.isPresent()) {
- return optional.get().getValue();
- }
- }
-
- try {
- return runner.executeAndRetrieveObject(
- () -> {
- try {
- return tryUpdateModSeq(mailboxId, findHighestModSeq(mailboxId))
- .map(ModSeq::getValue);
- } catch (Exception exception) {
- LOG.error("Can not retrieve next ModSeq", exception);
- throw Throwables.propagate(exception);
+ public CompletableFuture<Optional<Long>> nextModSeq(CassandraId mailboxId) {
+ return findHighestModSeq(mailboxId)
+ .thenCompose(modSeq -> {
+ if (modSeq.isFirst()) {
+ return tryInsertModSeq(mailboxId, FIRST_MODSEQ);
+ }
+ return tryUpdateModSeq(mailboxId, modSeq);
+ }).thenCompose(firstInsert -> {
+ if (firstInsert.isPresent()) {
+ return CompletableFuture.completedFuture(firstInsert);
}
- });
- } catch (LightweightTransactionException e) {
- throw new MailboxException("Error during ModSeq update", e);
- }
+ return handleRetries(mailboxId);
+ })
+ .thenApply(optional -> optional.map(ModSeq::getValue));
+ }
+
+ private CompletableFuture<Optional<ModSeq>> handleRetries(CassandraId mailboxId) {
+ return runner.executeAsyncAndRetrieveObject(
+ () -> findHighestModSeq(mailboxId)
+ .thenCompose(newModSeq -> tryUpdateModSeq(mailboxId, newModSeq)));
}
private static class ModSeq {
-
private final long value;
public ModSeq(long value) {
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org