You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2013/09/16 08:45:41 UTC
[2/3] git commit: CAS should distinguish promised and accepted ballots
CAS should distinguish promised and accepted ballots
patch by slebresne; reviewed by jbellis for CASSANDRA-6023
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/bab28e4a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/bab28e4a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/bab28e4a
Branch: refs/heads/trunk
Commit: bab28e4ae4cf420f24433c24ea64177d78a7307b
Parents: 6ec4eef
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Mon Sep 16 08:43:46 2013 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Mon Sep 16 08:43:46 2013 +0200
----------------------------------------------------------------------
CHANGES.txt | 2 +-
.../org/apache/cassandra/config/CFMetaData.java | 1 +
.../org/apache/cassandra/db/SystemKeyspace.java | 38 ++++++++++----------
.../cassandra/service/paxos/PaxosState.java | 35 +++++++++---------
.../service/paxos/PrepareCallback.java | 7 ++--
.../service/paxos/PrepareResponse.java | 6 ++++
6 files changed, 47 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bab28e4a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5517cee..2f01b7d 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -15,7 +15,7 @@
* Require superuser status for adding triggers (CASSANDRA-5963)
* Make standalone scrubber handle old and new style leveled manifest
(CASSANDRA-6005)
- * Fix paxos bugs (CASSANDRA-6012, 6013)
+ * Fix paxos bugs (CASSANDRA-6012, 6013, 6023)
Merged from 1.2:
1.2.10
* Fix possible divide-by-zero in HHOM (CASSANDRA-5990)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bab28e4a/src/java/org/apache/cassandra/config/CFMetaData.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/config/CFMetaData.java b/src/java/org/apache/cassandra/config/CFMetaData.java
index be3da21..939163d 100644
--- a/src/java/org/apache/cassandra/config/CFMetaData.java
+++ b/src/java/org/apache/cassandra/config/CFMetaData.java
@@ -252,6 +252,7 @@ public final class CFMetaData
+ "row_key blob,"
+ "cf_id UUID,"
+ "in_progress_ballot timeuuid,"
+ + "proposal_ballot timeuuid,"
+ "proposal blob,"
+ "most_recent_commit_at timeuuid,"
+ "most_recent_commit blob,"
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bab28e4a/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 7759114..3e608b3 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -782,14 +782,16 @@ public class SystemKeyspace
if (results.isEmpty())
return new PaxosState(key, metadata);
UntypedResultSet.Row row = results.one();
- Commit inProgress = new Commit(key,
- row.getUUID("in_progress_ballot"),
- row.has("proposal") ? ColumnFamily.fromBytes(row.getBytes("proposal")) : EmptyColumns.factory.create(metadata));
+ Commit promised = new Commit(key, row.getUUID("in_progress_ballot"), EmptyColumns.factory.create(metadata));
+ // either we have both a recently accepted ballot and update or we have neither
+ Commit accepted = row.has("proposal")
+ ? new Commit(key, row.getUUID("proposal_ballot"), ColumnFamily.fromBytes(row.getBytes("proposal")))
+ : Commit.emptyCommit(key, metadata);
// either most_recent_commit and most_recent_commit_at will both be set, or neither
Commit mostRecent = row.has("most_recent_commit")
? new Commit(key, row.getUUID("most_recent_commit_at"), ColumnFamily.fromBytes(row.getBytes("most_recent_commit")))
: Commit.emptyCommit(key, metadata);
- return new PaxosState(inProgress, mostRecent);
+ return new PaxosState(promised, accepted, mostRecent);
}
public static void savePaxosPromise(Commit promise)
@@ -804,16 +806,16 @@ public class SystemKeyspace
promise.update.id()));
}
- public static void savePaxosProposal(Commit commit)
+ public static void savePaxosProposal(Commit proposal)
{
- processInternal(String.format("UPDATE %s USING TIMESTAMP %d AND TTL %d SET in_progress_ballot = %s, proposal = 0x%s WHERE row_key = 0x%s AND cf_id = %s",
+ processInternal(String.format("UPDATE %s USING TIMESTAMP %d AND TTL %d SET proposal_ballot = %s, proposal = 0x%s WHERE row_key = 0x%s AND cf_id = %s",
PAXOS_CF,
- UUIDGen.microsTimestamp(commit.ballot),
- paxosTtl(commit.update.metadata),
- commit.ballot,
- ByteBufferUtil.bytesToHex(commit.update.toBytes()),
- ByteBufferUtil.bytesToHex(commit.key),
- commit.update.id()));
+ UUIDGen.microsTimestamp(proposal.ballot),
+ paxosTtl(proposal.update.metadata),
+ proposal.ballot,
+ ByteBufferUtil.bytesToHex(proposal.update.toBytes()),
+ ByteBufferUtil.bytesToHex(proposal.key),
+ proposal.update.id()));
}
private static int paxosTtl(CFMetaData metadata)
@@ -822,17 +824,15 @@ public class SystemKeyspace
return Math.max(3 * 3600, metadata.getGcGraceSeconds());
}
- public static void savePaxosCommit(Commit commit, UUID inProgressBallot)
+ public static void savePaxosCommit(Commit commit)
{
- String preserveCql = "UPDATE %s USING TIMESTAMP %d AND TTL %d SET in_progress_ballot = %s, most_recent_commit_at = %s, most_recent_commit = 0x%s WHERE row_key = 0x%s AND cf_id = %s";
- // identical except adds proposal = null
- String eraseCql = "UPDATE %s USING TIMESTAMP %d AND TTL %d SET proposal = null, in_progress_ballot = %s, most_recent_commit_at = %s, most_recent_commit = 0x%s WHERE row_key = 0x%s AND cf_id = %s";
- boolean proposalAfterCommit = inProgressBallot.timestamp() > commit.ballot.timestamp();
- processInternal(String.format(proposalAfterCommit ? preserveCql : eraseCql,
+ // We always erase the last proposal (with the commit timestamp to no erase more recent proposal in case the commit is old)
+ // even though that's really just an optimization since SP.beginAndRepairPaxos will exclude accepted proposal older than the mrc.
+ String cql = "UPDATE %s USING TIMESTAMP %d AND TTL %d SET proposal_ballot = null, proposal = null, most_recent_commit_at = %s, most_recent_commit = 0x%s WHERE row_key = 0x%s AND cf_id = %s";
+ processInternal(String.format(cql,
PAXOS_CF,
UUIDGen.microsTimestamp(commit.ballot),
paxosTtl(commit.update.metadata),
- proposalAfterCommit ? inProgressBallot : commit.ballot,
commit.ballot,
ByteBufferUtil.bytesToHex(commit.update.toBytes()),
ByteBufferUtil.bytesToHex(commit.key),
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bab28e4a/src/java/org/apache/cassandra/service/paxos/PaxosState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
index aa27628..ff0b02c 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
@@ -48,20 +48,22 @@ public class PaxosState
return locks[(0x7FFFFFFF & key.hashCode()) % locks.length];
}
- private final Commit inProgressCommit;
+ private final Commit promised;
+ private final Commit accepted;
private final Commit mostRecentCommit;
public PaxosState(ByteBuffer key, CFMetaData metadata)
{
- this(Commit.emptyCommit(key, metadata), Commit.emptyCommit(key, metadata));
+ this(Commit.emptyCommit(key, metadata), Commit.emptyCommit(key, metadata), Commit.emptyCommit(key, metadata));
}
- public PaxosState(Commit inProgressCommit, Commit mostRecentCommit)
+ public PaxosState(Commit promised, Commit accepted, Commit mostRecentCommit)
{
- assert inProgressCommit.key == mostRecentCommit.key;
- assert inProgressCommit.update.metadata() == inProgressCommit.update.metadata();
+ assert promised.key == accepted.key && accepted.key == mostRecentCommit.key;
+ assert promised.update.metadata() == accepted.update.metadata() && accepted.update.metadata() == mostRecentCommit.update.metadata();
- this.inProgressCommit = inProgressCommit;
+ this.promised = promised;
+ this.accepted = accepted;
this.mostRecentCommit = mostRecentCommit;
}
@@ -70,17 +72,17 @@ public class PaxosState
synchronized (lockFor(toPrepare.key))
{
PaxosState state = SystemKeyspace.loadPaxosState(toPrepare.key, toPrepare.update.metadata());
- if (toPrepare.isAfter(state.inProgressCommit))
+ if (toPrepare.isAfter(state.promised))
{
Tracing.trace("Promising ballot {}", toPrepare.ballot);
SystemKeyspace.savePaxosPromise(toPrepare);
- // return the pre-promise ballot so coordinator can pick the most recent in-progress value to resume
- return new PrepareResponse(true, state.inProgressCommit, state.mostRecentCommit);
+ return new PrepareResponse(true, state.accepted, state.mostRecentCommit);
}
else
{
- Tracing.trace("Promise rejected; {} is not sufficiently newer than {}", toPrepare, state.inProgressCommit);
- return new PrepareResponse(false, state.inProgressCommit, state.mostRecentCommit);
+ Tracing.trace("Promise rejected; {} is not sufficiently newer than {}", toPrepare, state.promised);
+ // return the currently promised ballot (not the last accepted one) so the coordinator can make sure it uses newer ballot next time (#5667)
+ return new PrepareResponse(false, state.promised, state.mostRecentCommit);
}
}
}
@@ -90,7 +92,7 @@ public class PaxosState
synchronized (lockFor(proposal.key))
{
PaxosState state = SystemKeyspace.loadPaxosState(proposal.key, proposal.update.metadata());
- if (proposal.hasBallot(state.inProgressCommit.ballot) || proposal.isAfter(state.inProgressCommit))
+ if (proposal.hasBallot(state.promised.ballot) || proposal.isAfter(state.promised))
{
Tracing.trace("Accepting proposal {}", proposal);
SystemKeyspace.savePaxosProposal(proposal);
@@ -98,7 +100,7 @@ public class PaxosState
}
else
{
- Tracing.trace("Rejecting proposal for {} because inProgress is now {}", proposal, state.inProgressCommit);
+ Tracing.trace("Rejecting proposal for {} because inProgress is now {}", proposal, state.promised);
return false;
}
}
@@ -115,10 +117,7 @@ public class PaxosState
RowMutation rm = proposal.makeMutation();
Keyspace.open(rm.getKeyspaceName()).apply(rm, true);
- synchronized (lockFor(proposal.key))
- {
- PaxosState state = SystemKeyspace.loadPaxosState(proposal.key, proposal.update.metadata());
- SystemKeyspace.savePaxosCommit(proposal, state.inProgressCommit.ballot);
- }
+ // We don't need to lock, we're just blindly updating
+ SystemKeyspace.savePaxosCommit(proposal);
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bab28e4a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
index 9293254..04a18b9 100644
--- a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
+++ b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
@@ -23,8 +23,8 @@ package org.apache.cassandra.service.paxos;
import java.net.InetAddress;
import java.nio.ByteBuffer;
-import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
@@ -43,7 +43,7 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
public Commit mostRecentInProgressCommit;
public Commit mostRecentInProgressCommitWithUpdate;
- private Map<InetAddress, Commit> commitsByReplica = new HashMap<InetAddress, Commit>();
+ private final Map<InetAddress, Commit> commitsByReplica = new ConcurrentHashMap<InetAddress, Commit>();
public PrepareCallback(ByteBuffer key, CFMetaData metadata, int targets)
{
@@ -73,6 +73,7 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
return;
}
+ commitsByReplica.put(message.from, response.mostRecentCommit);
if (response.mostRecentCommit.isAfter(mostRecentCommit))
mostRecentCommit = response.mostRecentCommit;
@@ -81,8 +82,6 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
if (response.inProgressCommit.isAfter(mostRecentInProgressCommitWithUpdate) && !response.inProgressCommit.update.isEmpty())
mostRecentInProgressCommitWithUpdate = response.inProgressCommit;
-
-
latch.countDown();
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/bab28e4a/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java b/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java
index 9f5fda6..d2bd835 100644
--- a/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java
+++ b/src/java/org/apache/cassandra/service/paxos/PrepareResponse.java
@@ -38,6 +38,12 @@ public class PrepareResponse
public static final PrepareResponseSerializer serializer = new PrepareResponseSerializer();
public final boolean promised;
+
+ /*
+ * To maintain backward compatibility (see #6023), the meaning of inProgressCommit is a bit tricky.
+ * If promised is true, then that's the last accepted commit. If promise is false, that's just
+ * the previously promised ballot that made us refuse this one.
+ */
public final Commit inProgressCommit;
public final Commit mostRecentCommit;