You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/09/16 08:45:19 UTC

[2/2] git commit: CAS should distinguish promised and accepted ballots

CAS should distinguish promised and accepted ballots

patch by slebresne; reviewed by jbellis for CASSANDRA-6023


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bab28e4a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bab28e4a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bab28e4a

Branch: refs/heads/cassandra-2.0
Commit: bab28e4ae4cf420f24433c24ea64177d78a7307b
Parents: 6ec4eef
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Mon Sep 16 08:43:46 2013 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Mon Sep 16 08:43:46 2013 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  2 +-
 .../org/apache/cassandra/config/CFMetaData.java |  1 +
 .../org/apache/cassandra/db/SystemKeyspace.java | 38 ++++++++++----------
 .../cassandra/service/paxos/PaxosState.java     | 35 +++++++++---------
 .../service/paxos/PrepareCallback.java          |  7 ++--
 .../service/paxos/PrepareResponse.java          |  6 ++++
 6 files changed, 47 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/bab28e4a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5517cee..2f01b7d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -15,7 +15,7 @@
  * Require superuser status for adding triggers (CASSANDRA-5963)
  * Make standalone scrubber handle old and new style leveled manifest
    (CASSANDRA-6005)
- * Fix paxos bugs (CASSANDRA-6012, 6013)
+ * Fix paxos bugs (CASSANDRA-6012, 6013, 6023)
 Merged from 1.2:
 1.2.10
  * Fix possible divide-by-zero in HHOM (CASSANDRA-5990)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bab28e4a/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index be3da21..939163d 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -252,6 +252,7 @@ public final class CFMetaData
                                                      + "row_key blob,"
                                                      + "cf_id UUID,"
                                                      + "in_progress_ballot timeuuid,"
+                                                     + "proposal_ballot timeuuid,"
                                                      + "proposal blob,"
                                                      + "most_recent_commit_at timeuuid,"
                                                      + "most_recent_commit blob,"

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bab28e4a/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 7759114..3e608b3 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -782,14 +782,16 @@ public class SystemKeyspace
         if (results.isEmpty())
             return new PaxosState(key, metadata);
         UntypedResultSet.Row row = results.one();
-        Commit inProgress = new Commit(key,
-                                       row.getUUID("in_progress_ballot"),
-                                       row.has("proposal") ? ColumnFamily.fromBytes(row.getBytes("proposal")) : EmptyColumns.factory.create(metadata));
+        Commit promised = new Commit(key, row.getUUID("in_progress_ballot"), EmptyColumns.factory.create(metadata));
+        // either we have both a recently accepted ballot and update or we have neither
+        Commit accepted = row.has("proposal")
+                        ? new Commit(key, row.getUUID("proposal_ballot"), ColumnFamily.fromBytes(row.getBytes("proposal")))
+                        : Commit.emptyCommit(key, metadata);
         // either most_recent_commit and most_recent_commit_at will both be set, or neither
         Commit mostRecent = row.has("most_recent_commit")
                           ? new Commit(key, row.getUUID("most_recent_commit_at"), ColumnFamily.fromBytes(row.getBytes("most_recent_commit")))
                           : Commit.emptyCommit(key, metadata);
-        return new PaxosState(inProgress, mostRecent);
+        return new PaxosState(promised, accepted, mostRecent);
     }
 
     public static void savePaxosPromise(Commit promise)
@@ -804,16 +806,16 @@ public class SystemKeyspace
                                       promise.update.id()));
     }
 
-    public static void savePaxosProposal(Commit commit)
+    public static void savePaxosProposal(Commit proposal)
     {
-        processInternal(String.format("UPDATE %s USING TIMESTAMP %d AND TTL %d SET in_progress_ballot = %s, proposal = 0x%s WHERE row_key = 0x%s AND cf_id = %s",
+        processInternal(String.format("UPDATE %s USING TIMESTAMP %d AND TTL %d SET proposal_ballot = %s, proposal = 0x%s WHERE row_key = 0x%s AND cf_id = %s",
                                       PAXOS_CF,
-                                      UUIDGen.microsTimestamp(commit.ballot),
-                                      paxosTtl(commit.update.metadata),
-                                      commit.ballot,
-                                      ByteBufferUtil.bytesToHex(commit.update.toBytes()),
-                                      ByteBufferUtil.bytesToHex(commit.key),
-                                      commit.update.id()));
+                                      UUIDGen.microsTimestamp(proposal.ballot),
+                                      paxosTtl(proposal.update.metadata),
+                                      proposal.ballot,
+                                      ByteBufferUtil.bytesToHex(proposal.update.toBytes()),
+                                      ByteBufferUtil.bytesToHex(proposal.key),
+                                      proposal.update.id()));
     }
 
     private static int paxosTtl(CFMetaData metadata)
@@ -822,17 +824,15 @@ public class SystemKeyspace
         return Math.max(3 * 3600, metadata.getGcGraceSeconds());
     }
 
-    public static void savePaxosCommit(Commit commit, UUID inProgressBallot)
+    public static void savePaxosCommit(Commit commit)
     {
-        String preserveCql = "UPDATE %s USING TIMESTAMP %d AND TTL %d SET in_progress_ballot = %s, most_recent_commit_at = %s, most_recent_commit = 0x%s WHERE row_key = 0x%s AND cf_id = %s";
-        // identical except adds proposal = null
-        String eraseCql = "UPDATE %s USING TIMESTAMP %d AND TTL %d SET proposal = null, in_progress_ballot = %s, most_recent_commit_at = %s, most_recent_commit = 0x%s WHERE row_key = 0x%s AND cf_id = %s";
-        boolean proposalAfterCommit = inProgressBallot.timestamp() > commit.ballot.timestamp();
-        processInternal(String.format(proposalAfterCommit ? preserveCql : eraseCql,
+        // We always erase the last proposal (with the commit timestamp to no erase more recent proposal in case the commit is old)
+        // even though that's really just an optimization  since SP.beginAndRepairPaxos will exclude accepted proposal older than the mrc.
+        String cql = "UPDATE %s USING TIMESTAMP %d AND TTL %d SET proposal_ballot = null, proposal = null, most_recent_commit_at = %s, most_recent_commit = 0x%s WHERE row_key = 0x%s AND cf_id = %s";
+        processInternal(String.format(cql,
                                       PAXOS_CF,
                                       UUIDGen.microsTimestamp(commit.ballot),
                                       paxosTtl(commit.update.metadata),
-                                      proposalAfterCommit ? inProgressBallot : commit.ballot,
                                       commit.ballot,
                                       ByteBufferUtil.bytesToHex(commit.update.toBytes()),
                                       ByteBufferUtil.bytesToHex(commit.key),

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bab28e4a/src/java/org/apache/cassandra/service/paxos/PaxosState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
index aa27628..ff0b02c 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
@@ -48,20 +48,22 @@ public class PaxosState
         return locks[(0x7FFFFFFF & key.hashCode()) % locks.length];
     }
 
-    private final Commit inProgressCommit;
+    private final Commit promised;
+    private final Commit accepted;
     private final Commit mostRecentCommit;
 
     public PaxosState(ByteBuffer key, CFMetaData metadata)
     {
-        this(Commit.emptyCommit(key, metadata), Commit.emptyCommit(key, metadata));
+        this(Commit.emptyCommit(key, metadata), Commit.emptyCommit(key, metadata), Commit.emptyCommit(key, metadata));
     }
 
-    public PaxosState(Commit inProgressCommit, Commit mostRecentCommit)
+    public PaxosState(Commit promised, Commit accepted, Commit mostRecentCommit)
     {
-        assert inProgressCommit.key == mostRecentCommit.key;
-        assert inProgressCommit.update.metadata() == inProgressCommit.update.metadata();
+        assert promised.key == accepted.key && accepted.key == mostRecentCommit.key;
+        assert promised.update.metadata() == accepted.update.metadata() && accepted.update.metadata() == mostRecentCommit.update.metadata();
 
-        this.inProgressCommit = inProgressCommit;
+        this.promised = promised;
+        this.accepted = accepted;
         this.mostRecentCommit = mostRecentCommit;
     }
 
@@ -70,17 +72,17 @@ public class PaxosState
         synchronized (lockFor(toPrepare.key))
         {
             PaxosState state = SystemKeyspace.loadPaxosState(toPrepare.key, toPrepare.update.metadata());
-            if (toPrepare.isAfter(state.inProgressCommit))
+            if (toPrepare.isAfter(state.promised))
             {
                 Tracing.trace("Promising ballot {}", toPrepare.ballot);
                 SystemKeyspace.savePaxosPromise(toPrepare);
-                // return the pre-promise ballot so coordinator can pick the most recent in-progress value to resume
-                return new PrepareResponse(true, state.inProgressCommit, state.mostRecentCommit);
+                return new PrepareResponse(true, state.accepted, state.mostRecentCommit);
             }
             else
             {
-                Tracing.trace("Promise rejected; {} is not sufficiently newer than {}", toPrepare, state.inProgressCommit);
-                return new PrepareResponse(false, state.inProgressCommit, state.mostRecentCommit);
+                Tracing.trace("Promise rejected; {} is not sufficiently newer than {}", toPrepare, state.promised);
+                // return the currently promised ballot (not the last accepted one) so the coordinator can make sure it uses newer ballot next time (#5667)
+                return new PrepareResponse(false, state.promised, state.mostRecentCommit);
             }
         }
     }
@@ -90,7 +92,7 @@ public class PaxosState
         synchronized (lockFor(proposal.key))
         {
             PaxosState state = SystemKeyspace.loadPaxosState(proposal.key, proposal.update.metadata());
-            if (proposal.hasBallot(state.inProgressCommit.ballot) || proposal.isAfter(state.inProgressCommit))
+            if (proposal.hasBallot(state.promised.ballot) || proposal.isAfter(state.promised))
             {
                 Tracing.trace("Accepting proposal {}", proposal);
                 SystemKeyspace.savePaxosProposal(proposal);
@@ -98,7 +100,7 @@ public class PaxosState
             }
             else
             {
-                Tracing.trace("Rejecting proposal for {} because inProgress is now {}", proposal, state.inProgressCommit);
+                Tracing.trace("Rejecting proposal for {} because inProgress is now {}", proposal, state.promised);
                 return false;
             }
         }
@@ -115,10 +117,7 @@ public class PaxosState
         RowMutation rm = proposal.makeMutation();
         Keyspace.open(rm.getKeyspaceName()).apply(rm, true);
 
-        synchronized (lockFor(proposal.key))
-        {
-            PaxosState state = SystemKeyspace.loadPaxosState(proposal.key, proposal.update.metadata());
-            SystemKeyspace.savePaxosCommit(proposal, state.inProgressCommit.ballot);
-        }
+        // We don't need to lock, we're just blindly updating
+        SystemKeyspace.savePaxosCommit(proposal);
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bab28e4a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
index 9293254..04a18b9 100644
--- a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
+++ b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
@@ -23,8 +23,8 @@ package org.apache.cassandra.service.paxos;
 
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
-import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterables;
@@ -43,7 +43,7 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
     public Commit mostRecentInProgressCommit;
     public Commit mostRecentInProgressCommitWithUpdate;
 
-    private Map<InetAddress, Commit> commitsByReplica = new HashMap<InetAddress, Commit>();
+    private final Map<InetAddress, Commit> commitsByReplica = new ConcurrentHashMap<InetAddress, Commit>();
 
     public PrepareCallback(ByteBuffer key, CFMetaData metadata, int targets)
     {
@@ -73,6 +73,7 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
             return;
         }
 
+        commitsByReplica.put(message.from, response.mostRecentCommit);
         if (response.mostRecentCommit.isAfter(mostRecentCommit))
             mostRecentCommit = response.mostRecentCommit;
 
@@ -81,8 +82,6 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
         if (response.inProgressCommit.isAfter(mostRecentInProgressCommitWithUpdate) && !response.inProgressCommit.update.isEmpty())
             mostRecentInProgressCommitWithUpdate = response.inProgressCommit;
 
-
-
         latch.countDown();
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/bab28e4a/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java b/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java
index 9f5fda6..d2bd835 100644
--- a/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java
+++ b/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java
@@ -38,6 +38,12 @@ public class PrepareResponse
     public static final PrepareResponseSerializer serializer = new PrepareResponseSerializer();
 
     public final boolean promised;
+
+    /*
+     * To maintain backward compatibility (see #6023), the meaning of inProgressCommit is a bit tricky.
+     * If promised is true, then that's the last accepted commit. If promise is false, that's just
+     * the previously promised ballot that made us refuse this one.
+     */
     public final Commit inProgressCommit;
     public final Commit mostRecentCommit;