You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2015/07/03 16:39:10 UTC

svn commit: r1689024 - /james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java

Author: btellier
Date: Fri Jul  3 14:39:09 2015
New Revision: 1689024

URL: http://svn.apache.org/r1689024
Log:
MAILBOX-208 Refactor Cassandra Uid provider ( similar to Cassandra Modseq provider )

Modified:
    james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java

Modified: james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java
URL: http://svn.apache.org/viewvc/james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java?rev=1689024&r1=1689023&r2=1689024&view=diff
==============================================================================
--- james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java (original)
+++ james/mailbox/trunk/cassandra/src/main/java/org/apache/james/mailbox/cassandra/mail/CassandraUidProvider.java Fri Jul  3 14:39:09 2015
@@ -19,34 +19,42 @@
 
 package org.apache.james.mailbox.cassandra.mail;
 
+import java.util.Optional;
+
 import static com.datastax.driver.core.querybuilder.QueryBuilder.eq;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.insertInto;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.select;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.set;
 import static com.datastax.driver.core.querybuilder.QueryBuilder.update;
-import static org.apache.james.mailbox.cassandra.table.CassandraMessageUidTable.MAILBOX_ID;
+import static org.apache.james.mailbox.cassandra.CassandraConstants.LIGHTWEIGHT_TRANSACTION_APPLIED;
 import static org.apache.james.mailbox.cassandra.table.CassandraMessageUidTable.NEXT_UID;
-import static org.apache.james.mailbox.cassandra.table.CassandraMessageUidTable.TABLE_NAME;
 
+import com.datastax.driver.core.querybuilder.BuiltStatement;
+import com.google.common.base.Throwables;
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.cassandra.CassandraId;
+import org.apache.james.mailbox.cassandra.mail.utils.FunctionRunnerWithRetry;
+import org.apache.james.mailbox.cassandra.table.CassandraMessageUidTable;
 import org.apache.james.mailbox.exception.MailboxException;
 import org.apache.james.mailbox.store.mail.UidProvider;
 import org.apache.james.mailbox.store.mail.model.Mailbox;
 
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Session;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class CassandraUidProvider implements UidProvider<CassandraId> {
     public final static int DEFAULT_MAX_RETRY = 100000;
+    private static final Logger LOG = LoggerFactory.getLogger(CassandraUidProvider.class);
+    private static final Uid FIRST_UID = new Uid(0);
 
-    private Session session;
-    private final int applied = 0;
-    private int maxRetry;
+    private final Session session;
+    private final FunctionRunnerWithRetry runner;
 
     public CassandraUidProvider(Session session, int maxRetry) {
         this.session = session;
-        this.maxRetry = maxRetry;
+        this.runner = new FunctionRunnerWithRetry(maxRetry);
     }
 
     public CassandraUidProvider(Session session) {
@@ -55,42 +63,86 @@ public class CassandraUidProvider implem
 
     @Override
     public long nextUid(MailboxSession mailboxSession, Mailbox<CassandraId> mailbox) throws MailboxException {
-        long lastUid = lastUid(mailboxSession, mailbox);
-        if (lastUid == 0) {
-            ResultSet result = session.execute(
-                    insertInto(TABLE_NAME)
-                    .value(NEXT_UID, ++lastUid)
-                    .value(MAILBOX_ID, mailbox.getMailboxId().asUuid())
-                    .ifNotExists());
-            if(result.one().getBool(applied)) {
-                return lastUid;
+        if (findHighestUid(mailbox).isFirst()) {
+            Optional<Uid> optional = tryInsertUid(mailbox, FIRST_UID);
+            if (optional.isPresent()) {
+                return optional.get().getValue();
             }
         }
-        int tries = 0;
-        boolean isApplied;
-        do {
-            tries++;
-            lastUid = lastUid(mailboxSession, mailbox);
-            ResultSet result = session.execute(
-                    update(TABLE_NAME)
-                    .onlyIf(eq(NEXT_UID, lastUid))
-                    .with(set(NEXT_UID, ++lastUid))
-                    .where(eq(MAILBOX_ID, mailbox.getMailboxId().asUuid())));
-            isApplied = result.one().getBool(applied);
-        } while (! isApplied && tries < maxRetry);
-        if( ! isApplied ) {
-            throw new MailboxException("Can not obtain rights to manage UID");
-        }
-        return lastUid;
+
+        return runner.executeAndRetrieveObject(
+            () -> {
+                try {
+                    return tryUpdateUid(mailbox, findHighestUid(mailbox))
+                        .map(Uid::getValue);
+                } catch (Exception exception) {
+                    LOG.error("Can not retrieve next Uid", exception);
+                    throw Throwables.propagate(exception);
+                }
+            });
     }
 
     @Override
     public long lastUid(MailboxSession mailboxSession, Mailbox<CassandraId> mailbox) throws MailboxException {
+        return findHighestUid(mailbox).getValue();
+    }
+
+    private Uid findHighestUid(Mailbox<CassandraId> mailbox) throws MailboxException {
         ResultSet result = session.execute(
-                select(NEXT_UID)
-                .from(TABLE_NAME)
-                .where(eq(MAILBOX_ID, mailbox.getMailboxId().asUuid())));
-        return result.isExhausted() ? 0 : result.one().getLong(NEXT_UID);
+            select(NEXT_UID)
+                .from(CassandraMessageUidTable.TABLE_NAME)
+                .where(eq(CassandraMessageUidTable.MAILBOX_ID, mailbox.getMailboxId().asUuid())));
+        if (result.isExhausted()) {
+            return FIRST_UID;
+        } else {
+            return new Uid(result.one().getLong(NEXT_UID));
+        }
+    }
+
+    private Optional<Uid> tryInsertUid(Mailbox<CassandraId> mailbox, Uid uid) {
+        Uid nextUid = uid.next();
+        return transactionalStatementToOptionalUid(nextUid,
+            insertInto(CassandraMessageUidTable.TABLE_NAME)
+                .value(NEXT_UID, nextUid.getValue())
+                .value(CassandraMessageUidTable.MAILBOX_ID, mailbox.getMailboxId().asUuid())
+                .ifNotExists());
+    }
+
+    private Optional<Uid> tryUpdateUid(Mailbox<CassandraId> mailbox, Uid uid) {
+        Uid nextUid = uid.next();
+        return transactionalStatementToOptionalUid(nextUid,
+            update(CassandraMessageUidTable.TABLE_NAME)
+                .onlyIf(eq(NEXT_UID, uid.getValue()))
+                .with(set(NEXT_UID, nextUid.getValue()))
+                .where(eq(CassandraMessageUidTable.MAILBOX_ID, mailbox.getMailboxId().asUuid())));
+    }
+
+    private Optional<Uid> transactionalStatementToOptionalUid(Uid uid, BuiltStatement statement) {
+        if(session.execute(statement).one().getBool(LIGHTWEIGHT_TRANSACTION_APPLIED)) {
+            return Optional.of(uid);
+        }
+        return Optional.empty();
+    }
+
+    private static class Uid {
+
+        private final long value;
+
+        public Uid(long value) {
+            this.value = value;
+        }
+
+        public Uid next() {
+            return new Uid(value + 1);
+        }
+
+        public long getValue() {
+            return value;
+        }
+
+        public boolean isFirst() {
+            return value == FIRST_UID.value;
+        }
     }
 
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org