You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/06/28 13:21:53 UTC
[03/17] cassandra git commit: Avoid stalling Paxos when the paxos
state expires
Avoid stalling Paxos when the paxos state expires
This commit does 2 things:
- It ignores MRCs that are old enough to have expired in some nodes paxos tables
- It ensures the same timestamp is used when reading the paxos state and ignoring old MRC
patch by slebresne; reviewed by jasobraown for CASSANDRA-12043
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/017ec3e9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/017ec3e9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/017ec3e9
Branch: refs/heads/cassandra-3.0
Commit: 017ec3e99e704db5e1a36ad153af08d6e7eca523
Parents: 2811f15
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Jun 22 12:12:37 2016 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Jun 28 15:16:00 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/cassandra/cql3/QueryProcessor.java | 28 +++++++++++++++++++-
.../cql3/statements/SelectStatement.java | 6 ++++-
.../org/apache/cassandra/db/SystemKeyspace.java | 6 ++---
.../apache/cassandra/service/StorageProxy.java | 2 +-
.../cassandra/service/paxos/PaxosState.java | 11 ++++++--
.../service/paxos/PrepareCallback.java | 18 ++++++++++++-
7 files changed, 63 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/017ec3e9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5741241..feeaded 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.15
+ * Avoid stalling paxos when the paxos state expires (CASSANDRA-12043)
* Remove finished incoming streaming connections from MessagingService (CASSANDRA-11854)
* Don't try to get sstables for non-repairing column families (CASSANDRA-12077)
* Prevent select statements with clustering key > 64k (CASSANDRA-11882)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/017ec3e9/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index d4ca76f..4340d42 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -296,7 +296,7 @@ public class QueryProcessor implements QueryHandler
return QueryOptions.forInternalCalls(boundValues);
}
- private static ParsedStatement.Prepared prepareInternal(String query) throws RequestValidationException
+ public static ParsedStatement.Prepared prepareInternal(String query) throws RequestValidationException
{
ParsedStatement.Prepared prepared = internalStatements.get(query);
if (prepared != null)
@@ -374,6 +374,32 @@ public class QueryProcessor implements QueryHandler
}
}
+ /**
+ * A special version of executeInternal that takes the time used as "now" for the query in argument.
+ * Note that this only make sense for Selects so this only accept SELECT statements and is only useful in rare
+ * cases.
+ */
+ public static UntypedResultSet executeInternalWithNow(long now, String query, Object... values)
+ {
+ try
+ {
+ ParsedStatement.Prepared prepared = prepareInternal(query);
+ assert prepared.statement instanceof SelectStatement;
+ SelectStatement select = (SelectStatement)prepared.statement;
+ ResultMessage result = select.executeInternal(internalQueryState(), makeInternalOptions(prepared, values), now);
+ assert result instanceof ResultMessage.Rows;
+ return UntypedResultSet.create(((ResultMessage.Rows)result).result);
+ }
+ catch (RequestExecutionException e)
+ {
+ throw new RuntimeException(e);
+ }
+ catch (RequestValidationException e)
+ {
+ throw new RuntimeException("Error validating query " + query, e);
+ }
+ }
+
public static UntypedResultSet resultify(String query, Row row)
{
return resultify(query, Collections.singletonList(row));
http://git-wip-us.apache.org/repos/asf/cassandra/blob/017ec3e9/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 1e142e0..6351bb5 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -312,8 +312,12 @@ public class SelectStatement implements CQLStatement
public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException
{
+ return executeInternal(state, options, System.currentTimeMillis());
+ }
+
+ public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options, long now) throws RequestExecutionException, RequestValidationException
+ {
int limit = getLimit(options);
- long now = System.currentTimeMillis();
Pageable command = getPageableCommand(options, limit, now);
int pageSize = options.getPageSize();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/017ec3e9/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 1f66b1b..f8cf1ab 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -896,10 +896,10 @@ public class SystemKeyspace
return new Row(key, cf);
}
- public static PaxosState loadPaxosState(ByteBuffer key, CFMetaData metadata)
+ public static PaxosState loadPaxosState(ByteBuffer key, CFMetaData metadata, long now)
{
String req = "SELECT * FROM system.%s WHERE row_key = ? AND cf_id = ?";
- UntypedResultSet results = executeInternal(String.format(req, PAXOS_CF), key, metadata.cfId);
+ UntypedResultSet results = QueryProcessor.executeInternalWithNow(now, String.format(req, PAXOS_CF), key, metadata.cfId);
if (results.isEmpty())
return new PaxosState(key, metadata);
UntypedResultSet.Row row = results.one();
@@ -939,7 +939,7 @@ public class SystemKeyspace
proposal.update.id());
}
- private static int paxosTtl(CFMetaData metadata)
+ public static int paxosTtl(CFMetaData metadata)
{
// keep paxos state around for at least 3h
return Math.max(3 * 3600, metadata.getGcGraceSeconds());
http://git-wip-us.apache.org/repos/asf/cassandra/blob/017ec3e9/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index af0693b..cddc7e9 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -421,7 +421,7 @@ public class StorageProxy implements StorageProxyMBean
// https://issues.apache.org/jira/browse/CASSANDRA-5062?focusedCommentId=13619810&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13619810)
// Since we waited for quorum nodes, if some of them haven't seen the last commit (which may just be a timing issue, but may also
// mean we lost messages), we pro-actively "repair" those nodes, and retry.
- Iterable<InetAddress> missingMRC = summary.replicasMissingMostRecentCommit();
+ Iterable<InetAddress> missingMRC = summary.replicasMissingMostRecentCommit(metadata, ballotMicros);
if (Iterables.size(missingMRC) > 0)
{
Tracing.trace("Repairing replicas that missed the most recent commit");
http://git-wip-us.apache.org/repos/asf/cassandra/blob/017ec3e9/src/java/org/apache/cassandra/service/paxos/PaxosState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
index 01e03f4..fde881b 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
@@ -63,7 +63,13 @@ public class PaxosState
lock.lock();
try
{
- PaxosState state = SystemKeyspace.loadPaxosState(toPrepare.key, toPrepare.update.metadata());
+ // When preparing, we need to use the same time as "now" (that's the time we use to decide if something
+ // is expired or not) accross nodes otherwise we may have a window where a Most Recent Commit shows up
+ // on some replica and not others during a new proposal (in StorageProxy.beginAndRepairPaxos()), and no
+ // amount of re-submit will fix this (because the node on which the commit has expired will have a
+ // tombstone that hides any re-submit). See CASSANDRA-12043 for details.
+ long now = UUIDGen.unixTimestamp(toPrepare.ballot);
+ PaxosState state = SystemKeyspace.loadPaxosState(toPrepare.key, toPrepare.update.metadata(), now);
if (toPrepare.isAfter(state.promised))
{
Tracing.trace("Promising ballot {}", toPrepare.ballot);
@@ -98,7 +104,8 @@ public class PaxosState
lock.lock();
try
{
- PaxosState state = SystemKeyspace.loadPaxosState(proposal.key, proposal.update.metadata());
+ long now = UUIDGen.unixTimestamp(proposal.ballot);
+ PaxosState state = SystemKeyspace.loadPaxosState(proposal.key, proposal.update.metadata(), now);
if (proposal.hasBallot(state.promised.ballot) || proposal.isAfter(state.promised))
{
Tracing.trace("Accepting proposal {}", proposal);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/017ec3e9/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
index a446b0b..2859a69 100644
--- a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
+++ b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
@@ -23,6 +23,7 @@ package org.apache.cassandra.service.paxos;
import java.net.InetAddress;
import java.nio.ByteBuffer;
+import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -33,7 +34,9 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.utils.UUIDGen;
public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
{
@@ -86,8 +89,21 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
latch.countDown();
}
- public Iterable<InetAddress> replicasMissingMostRecentCommit()
+ public Iterable<InetAddress> replicasMissingMostRecentCommit(CFMetaData metadata, long now)
{
+ // In general, we need every replicas that have answered to the prepare (a quorum) to agree on the MRC (see
+ // coment in StorageProxy.beginAndRepairPaxos(), but basically we need to make sure at least a quorum of nodes
+ // have learn a commit before commit a new one otherwise that previous commit is not guaranteed to have reach a
+ // quorum and further commit may proceed on incomplete information).
+ // However, if that commit is too hold, it may have been expired from some of the replicas paxos table (we don't
+ // keep the paxos state forever or that could grow unchecked), and we could end up in some infinite loop as
+ // explained on CASSANDRA-12043. To avoid that, we ignore a MRC that is too old, i.e. older than the TTL we set
+ // on paxos tables. For such old commit, we rely on hints and repair to ensure the commit has indeed be
+ // propagated to all nodes.
+ long paxosTtlMicros = SystemKeyspace.paxosTtl(metadata) * 1000 * 1000;
+ if (UUIDGen.microsTimestamp(mostRecentCommit.ballot) + paxosTtlMicros < now)
+ return Collections.emptySet();
+
return Iterables.filter(commitsByReplica.keySet(), new Predicate<InetAddress>()
{
public boolean apply(InetAddress inetAddress)