You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/09/13 12:07:31 UTC

git commit: Fix paxos not always replaying when it should

Updated Branches:
  refs/heads/cassandra-2.0 fa70e064c -> 50c9d77e1


Fix paxos not always replaying when it should

patch by slebresne; reviewed by jbellis for CASSANDRA-6012


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/50c9d77e
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/50c9d77e
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/50c9d77e

Branch: refs/heads/cassandra-2.0
Commit: 50c9d77e1b2be206a765e7f86cd4b8ecf3d232df
Parents: fa70e06
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Fri Sep 13 12:06:46 2013 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Fri Sep 13 12:06:46 2013 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/service/StorageProxy.java  |  4 ++--
 .../service/paxos/PrepareCallback.java          | 20 ++++++++++++++++----
 3 files changed, 19 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/50c9d77e/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 814029d..ebdfddb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -15,6 +15,7 @@
  * Require superuser status for adding triggers (CASSANDRA-5963)
  * Make standalone scrubber handle old and new style leveled manifest
    (CASSANDRA-6005)
+ * Fix paxos not always replaying when it should  (CASSANDRA-6012)
 Merged from 1.2:
 1.2.10
  * Fix possible divide-by-zero in HHOM (CASSANDRA-5990)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/50c9d77e/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 1184bb5..83cb265 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -357,7 +357,7 @@ public class StorageProxy implements StorageProxyMBean
         {
             long ballotMillis = summary == null
                               ? System.currentTimeMillis()
-                              : Math.max(System.currentTimeMillis(), 1 + UUIDGen.unixTimestamp(summary.inProgressCommit.ballot));
+                              : Math.max(System.currentTimeMillis(), 1 + UUIDGen.unixTimestamp(summary.mostRecentInProgressCommit.ballot));
             UUID ballot = UUIDGen.getTimeUUID(ballotMillis);
 
             // prepare
@@ -372,7 +372,7 @@ public class StorageProxy implements StorageProxyMBean
                 continue;
             }
 
-            Commit inProgress = summary.inProgressCommit;
+            Commit inProgress = summary.mostRecentInProgressCommitWithUpdate;
             Commit mostRecent = summary.mostRecentCommit;
 
             // If we have an in-progress ballot greater than the MRC we know, then it's an in-progress round that

http://git-wip-us.apache.org/repos/asf/cassandra/blob/50c9d77e/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
index 23d7952..9293254 100644
--- a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
+++ b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
@@ -40,7 +40,8 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
 
     public boolean promised = true;
     public Commit mostRecentCommit;
-    public Commit inProgressCommit;
+    public Commit mostRecentInProgressCommit;
+    public Commit mostRecentInProgressCommitWithUpdate;
 
     private Map<InetAddress, Commit> commitsByReplica = new HashMap<InetAddress, Commit>();
 
@@ -49,7 +50,8 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
         super(targets);
         // need to inject the right key in the empty commit so comparing with empty commits in the reply works as expected
         mostRecentCommit = Commit.emptyCommit(key, metadata);
-        inProgressCommit = Commit.emptyCommit(key, metadata);
+        mostRecentInProgressCommit = Commit.emptyCommit(key, metadata);
+        mostRecentInProgressCommitWithUpdate = Commit.emptyCommit(key, metadata);
     }
 
     public synchronized void response(MessageIn<PrepareResponse> message)
@@ -57,6 +59,12 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
         PrepareResponse response = message.payload;
         logger.debug("Prepare response {} from {}", response, message.from);
 
+        // In case of clock skew, another node could be proposing with ballot that are quite a bit
+        // older than our own. In that case, we record the more recent commit we've received to make
+        // sure we re-prepare on an older ballot.
+        if (response.inProgressCommit.isAfter(mostRecentInProgressCommit))
+            mostRecentInProgressCommit = response.inProgressCommit;
+
         if (!response.promised)
         {
             promised = false;
@@ -68,8 +76,12 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
         if (response.mostRecentCommit.isAfter(mostRecentCommit))
             mostRecentCommit = response.mostRecentCommit;
 
-        if (response.inProgressCommit.isAfter(inProgressCommit))
-            inProgressCommit = response.inProgressCommit;
+        // If some response has an update, then we should replay the update with the highest ballot. So find
+        // the the highest commit that actually have an update
+        if (response.inProgressCommit.isAfter(mostRecentInProgressCommitWithUpdate) && !response.inProgressCommit.update.isEmpty())
+            mostRecentInProgressCommitWithUpdate = response.inProgressCommit;
+
+
 
         latch.countDown();
     }