You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/06/28 13:22:06 UTC
[16/17] cassandra git commit: Avoid stalling Paxos when the paxos
state expires
Avoid stalling Paxos when the paxos state expires
This commit does 2 things:
- It ignores MRCs that are old enough to have expired in some nodes paxos tables
- It ensures the same timestamp is used when reading the paxos state and ignoring old MRC
patch by slebresne; reviewed by jasobraown for CASSANDRA-12043
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/53f8f095
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/53f8f095
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/53f8f095
Branch: refs/heads/cassandra-3.9
Commit: 53f8f09575dfbdc166b257fe71a4d4a832470253
Parents: d289778
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Jun 22 12:12:37 2016 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Jun 28 15:21:05 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../org/apache/cassandra/cql3/QueryProcessor.java | 17 ++++++++++++++++-
.../cql3/statements/SelectStatement.java | 6 +++++-
.../org/apache/cassandra/db/SystemKeyspace.java | 12 ++++++------
.../apache/cassandra/service/StorageProxy.java | 4 +++-
.../cassandra/service/paxos/PaxosState.java | 11 +++++++++--
.../cassandra/service/paxos/PrepareCallback.java | 18 +++++++++++++++++-
src/java/org/apache/cassandra/utils/UUIDGen.java | 12 ++++++++++++
8 files changed, 69 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53f8f095/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cd13896..075d44a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
3.9
Merged from 2.1:
+ * Avoid stalling paxos when the paxos state expires (CASSANDRA-12043)
* Remove finished incoming streaming connections from MessagingService (CASSANDRA-11854)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53f8f095/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index d812af4..222204b 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -258,7 +258,7 @@ public class QueryProcessor implements QueryHandler
return QueryOptions.forInternalCalls(cl, boundValues);
}
- private static ParsedStatement.Prepared prepareInternal(String query) throws RequestValidationException
+ public static ParsedStatement.Prepared prepareInternal(String query) throws RequestValidationException
{
ParsedStatement.Prepared prepared = internalStatements.get(query);
if (prepared != null)
@@ -331,6 +331,21 @@ public class QueryProcessor implements QueryHandler
return null;
}
+ /**
+ * A special version of executeInternal that takes the time used as "now" for the query in argument.
+ * Note that this only make sense for Selects so this only accept SELECT statements and is only useful in rare
+ * cases.
+ */
+ public static UntypedResultSet executeInternalWithNow(int nowInSec, String query, Object... values)
+ {
+ ParsedStatement.Prepared prepared = prepareInternal(query);
+ assert prepared.statement instanceof SelectStatement;
+ SelectStatement select = (SelectStatement)prepared.statement;
+ ResultMessage result = select.executeInternal(internalQueryState(), makeInternalOptions(prepared, values), nowInSec);
+ assert result instanceof ResultMessage.Rows;
+ return UntypedResultSet.create(((ResultMessage.Rows)result).result);
+ }
+
public static UntypedResultSet resultify(String query, RowIterator partition)
{
return resultify(query, PartitionIterators.singletonIterator(partition));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53f8f095/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 5f37e5e..f2b484e 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -414,7 +414,11 @@ public class SelectStatement implements CQLStatement
public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException
{
- int nowInSec = FBUtilities.nowInSeconds();
+ return executeInternal(state, options, FBUtilities.nowInSeconds());
+ }
+
+ public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options, int nowInSec) throws RequestExecutionException, RequestValidationException
+ {
int userLimit = getLimit(options);
int userPerPartitionLimit = getPerPartitionLimit(options);
ReadQuery query = getQuery(options, nowInSec, userLimit, userPerPartitionLimit);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53f8f095/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 1203834..584279d 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -1103,10 +1103,10 @@ public final class SystemKeyspace
return null;
}
- public static PaxosState loadPaxosState(DecoratedKey key, CFMetaData metadata)
+ public static PaxosState loadPaxosState(DecoratedKey key, CFMetaData metadata, int nowInSec)
{
String req = "SELECT * FROM system.%s WHERE row_key = ? AND cf_id = ?";
- UntypedResultSet results = executeInternal(String.format(req, PAXOS), key.getKey(), metadata.cfId);
+ UntypedResultSet results = QueryProcessor.executeInternalWithNow(nowInSec, String.format(req, PAXOS), key.getKey(), metadata.cfId);
if (results.isEmpty())
return new PaxosState(key, metadata);
UntypedResultSet.Row row = results.one();
@@ -1131,7 +1131,7 @@ public final class SystemKeyspace
String req = "UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET in_progress_ballot = ? WHERE row_key = ? AND cf_id = ?";
executeInternal(String.format(req, PAXOS),
UUIDGen.microsTimestamp(promise.ballot),
- paxosTtl(promise.update.metadata()),
+ paxosTtlSec(promise.update.metadata()),
promise.ballot,
promise.update.partitionKey().getKey(),
promise.update.metadata().cfId);
@@ -1141,7 +1141,7 @@ public final class SystemKeyspace
{
executeInternal(String.format("UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = ?, proposal = ?, proposal_version = ? WHERE row_key = ? AND cf_id = ?", PAXOS),
UUIDGen.microsTimestamp(proposal.ballot),
- paxosTtl(proposal.update.metadata()),
+ paxosTtlSec(proposal.update.metadata()),
proposal.ballot,
PartitionUpdate.toBytes(proposal.update, MessagingService.current_version),
MessagingService.current_version,
@@ -1149,7 +1149,7 @@ public final class SystemKeyspace
proposal.update.metadata().cfId);
}
- private static int paxosTtl(CFMetaData metadata)
+ public static int paxosTtlSec(CFMetaData metadata)
{
// keep paxos state around for at least 3h
return Math.max(3 * 3600, metadata.params.gcGraceSeconds);
@@ -1162,7 +1162,7 @@ public final class SystemKeyspace
String cql = "UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = null, proposal = null, most_recent_commit_at = ?, most_recent_commit = ?, most_recent_commit_version = ? WHERE row_key = ? AND cf_id = ?";
executeInternal(String.format(cql, PAXOS),
UUIDGen.microsTimestamp(commit.ballot),
- paxosTtl(commit.update.metadata()),
+ paxosTtlSec(commit.update.metadata()),
commit.ballot,
PartitionUpdate.toBytes(commit.update, MessagingService.current_version),
MessagingService.current_version,
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53f8f095/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 90c246e..c88c449 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -31,6 +31,7 @@ import javax.management.ObjectName;
import com.google.common.base.Predicate;
import com.google.common.cache.CacheLoader;
import com.google.common.collect.*;
+import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
@@ -440,7 +441,8 @@ public class StorageProxy implements StorageProxyMBean
// https://issues.apache.org/jira/browse/CASSANDRA-5062?focusedCommentId=13619810&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13619810)
// Since we waited for quorum nodes, if some of them haven't seen the last commit (which may just be a timing issue, but may also
// mean we lost messages), we pro-actively "repair" those nodes, and retry.
- Iterable<InetAddress> missingMRC = summary.replicasMissingMostRecentCommit();
+ int nowInSec = Ints.checkedCast(TimeUnit.MICROSECONDS.toSeconds(ballotMicros));
+ Iterable<InetAddress> missingMRC = summary.replicasMissingMostRecentCommit(metadata, nowInSec);
if (Iterables.size(missingMRC) > 0)
{
Tracing.trace("Repairing replicas that missed the most recent commit");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53f8f095/src/java/org/apache/cassandra/service/paxos/PaxosState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
index 0b3af8f..e01f568 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
@@ -65,7 +65,13 @@ public class PaxosState
lock.lock();
try
{
- PaxosState state = SystemKeyspace.loadPaxosState(toPrepare.update.partitionKey(), toPrepare.update.metadata());
+ // When preparing, we need to use the same time as "now" (that's the time we use to decide if something
+ // is expired or not) accross nodes otherwise we may have a window where a Most Recent Commit shows up
+ // on some replica and not others during a new proposal (in StorageProxy.beginAndRepairPaxos()), and no
+ // amount of re-submit will fix this (because the node on which the commit has expired will have a
+ // tombstone that hides any re-submit). See CASSANDRA-12043 for details.
+ int nowInSec = UUIDGen.unixTimestampInSec(toPrepare.ballot);
+ PaxosState state = SystemKeyspace.loadPaxosState(toPrepare.update.partitionKey(), toPrepare.update.metadata(), nowInSec);
if (toPrepare.isAfter(state.promised))
{
Tracing.trace("Promising ballot {}", toPrepare.ballot);
@@ -100,7 +106,8 @@ public class PaxosState
lock.lock();
try
{
- PaxosState state = SystemKeyspace.loadPaxosState(proposal.update.partitionKey(), proposal.update.metadata());
+ int nowInSec = UUIDGen.unixTimestampInSec(proposal.ballot);
+ PaxosState state = SystemKeyspace.loadPaxosState(proposal.update.partitionKey(), proposal.update.metadata(), nowInSec);
if (proposal.hasBallot(state.promised.ballot) || proposal.isAfter(state.promised))
{
Tracing.trace("Accepting proposal {}", proposal);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53f8f095/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
index 9c54b01..ff81803 100644
--- a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
+++ b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
@@ -23,6 +23,7 @@ package org.apache.cassandra.service.paxos;
import java.net.InetAddress;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -34,7 +35,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.utils.UUIDGen;
public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
{
@@ -87,8 +90,21 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
latch.countDown();
}
- public Iterable<InetAddress> replicasMissingMostRecentCommit()
+ public Iterable<InetAddress> replicasMissingMostRecentCommit(CFMetaData metadata, int nowInSec)
{
+ // In general, we need every replicas that have answered to the prepare (a quorum) to agree on the MRC (see
+ // coment in StorageProxy.beginAndRepairPaxos(), but basically we need to make sure at least a quorum of nodes
+ // have learn a commit before commit a new one otherwise that previous commit is not guaranteed to have reach a
+ // quorum and further commit may proceed on incomplete information).
+ // However, if that commit is too hold, it may have been expired from some of the replicas paxos table (we don't
+ // keep the paxos state forever or that could grow unchecked), and we could end up in some infinite loop as
+ // explained on CASSANDRA-12043. To avoid that, we ignore a MRC that is too old, i.e. older than the TTL we set
+ // on paxos tables. For such old commit, we rely on hints and repair to ensure the commit has indeed be
+ // propagated to all nodes.
+ long paxosTtlSec = SystemKeyspace.paxosTtlSec(metadata);
+ if (UUIDGen.unixTimestampInSec(mostRecentCommit.ballot) + paxosTtlSec < nowInSec)
+ return Collections.emptySet();
+
return Iterables.filter(commitsByReplica.keySet(), new Predicate<InetAddress>()
{
public boolean apply(InetAddress inetAddress)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/53f8f095/src/java/org/apache/cassandra/utils/UUIDGen.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/UUIDGen.java b/src/java/org/apache/cassandra/utils/UUIDGen.java
index 00efbe3..a8b3093 100644
--- a/src/java/org/apache/cassandra/utils/UUIDGen.java
+++ b/src/java/org/apache/cassandra/utils/UUIDGen.java
@@ -26,8 +26,11 @@ import java.util.Collection;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
+import com.google.common.primitives.Ints;
/**
* The goods are here: www.ietf.org/rfc/rfc4122.txt.
@@ -211,6 +214,15 @@ public class UUIDGen
/**
* @param uuid
+ * @return seconds since Unix epoch
+ */
+ public static int unixTimestampInSec(UUID uuid)
+ {
+ return Ints.checkedCast(TimeUnit.MILLISECONDS.toSeconds(unixTimestamp(uuid)));
+ }
+
+ /**
+ * @param uuid
* @return microseconds since Unix epoch
*/
public static long microsTimestamp(UUID uuid)