You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/06/28 13:21:53 UTC

[03/17] cassandra git commit: Avoid stalling Paxos when the paxos state expires

Avoid stalling Paxos when the paxos state expires

This commit does 2 things:
- It ignores MRCs that are old enough to have expired in some nodes paxos tables
- It ensures the same timestamp is used when reading the paxos state and ignoring old MRC

patch by slebresne; reviewed by jasobraown for CASSANDRA-12043


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/017ec3e9
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/017ec3e9
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/017ec3e9

Branch: refs/heads/cassandra-3.0
Commit: 017ec3e99e704db5e1a36ad153af08d6e7eca523
Parents: 2811f15
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Jun 22 12:12:37 2016 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Jun 28 15:16:00 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/cassandra/cql3/QueryProcessor.java   | 28 +++++++++++++++++++-
 .../cql3/statements/SelectStatement.java        |  6 ++++-
 .../org/apache/cassandra/db/SystemKeyspace.java |  6 ++---
 .../apache/cassandra/service/StorageProxy.java  |  2 +-
 .../cassandra/service/paxos/PaxosState.java     | 11 ++++++--
 .../service/paxos/PrepareCallback.java          | 18 ++++++++++++-
 7 files changed, 63 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/017ec3e9/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 5741241..feeaded 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.15
+ * Avoid stalling paxos when the paxos state expires (CASSANDRA-12043)
  * Remove finished incoming streaming connections from MessagingService (CASSANDRA-11854)
  * Don't try to get sstables for non-repairing column families (CASSANDRA-12077)
  * Prevent select statements with clustering key > 64k (CASSANDRA-11882)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/017ec3e9/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index d4ca76f..4340d42 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -296,7 +296,7 @@ public class QueryProcessor implements QueryHandler
         return QueryOptions.forInternalCalls(boundValues);
     }
 
-    private static ParsedStatement.Prepared prepareInternal(String query) throws RequestValidationException
+    public static ParsedStatement.Prepared prepareInternal(String query) throws RequestValidationException
     {
         ParsedStatement.Prepared prepared = internalStatements.get(query);
         if (prepared != null)
@@ -374,6 +374,32 @@ public class QueryProcessor implements QueryHandler
         }
     }
 
+    /**
+     * A special version of executeInternal that takes the time used as "now" for the query in argument.
+     * Note that this only make sense for Selects so this only accept SELECT statements and is only useful in rare
+     * cases.
+     */
+    public static UntypedResultSet executeInternalWithNow(long now, String query, Object... values)
+    {
+        try
+        {
+            ParsedStatement.Prepared prepared = prepareInternal(query);
+            assert prepared.statement instanceof SelectStatement;
+            SelectStatement select = (SelectStatement)prepared.statement;
+            ResultMessage result = select.executeInternal(internalQueryState(), makeInternalOptions(prepared, values), now);
+            assert result instanceof ResultMessage.Rows;
+            return UntypedResultSet.create(((ResultMessage.Rows)result).result);
+        }
+        catch (RequestExecutionException e)
+        {
+            throw new RuntimeException(e);
+        }
+        catch (RequestValidationException e)
+        {
+            throw new RuntimeException("Error validating query " + query, e);
+        }
+    }
+
     public static UntypedResultSet resultify(String query, Row row)
     {
         return resultify(query, Collections.singletonList(row));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/017ec3e9/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 1e142e0..6351bb5 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -312,8 +312,12 @@ public class SelectStatement implements CQLStatement
 
     public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException
     {
+        return executeInternal(state, options, System.currentTimeMillis());
+    }
+
+    public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options, long now) throws RequestExecutionException, RequestValidationException
+    {
         int limit = getLimit(options);
-        long now = System.currentTimeMillis();
         Pageable command = getPageableCommand(options, limit, now);
 
         int pageSize = options.getPageSize();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/017ec3e9/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 1f66b1b..f8cf1ab 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -896,10 +896,10 @@ public class SystemKeyspace
         return new Row(key, cf);
     }
 
-    public static PaxosState loadPaxosState(ByteBuffer key, CFMetaData metadata)
+    public static PaxosState loadPaxosState(ByteBuffer key, CFMetaData metadata, long now)
     {
         String req = "SELECT * FROM system.%s WHERE row_key = ? AND cf_id = ?";
-        UntypedResultSet results = executeInternal(String.format(req, PAXOS_CF), key, metadata.cfId);
+        UntypedResultSet results = QueryProcessor.executeInternalWithNow(now, String.format(req, PAXOS_CF), key, metadata.cfId);
         if (results.isEmpty())
             return new PaxosState(key, metadata);
         UntypedResultSet.Row row = results.one();
@@ -939,7 +939,7 @@ public class SystemKeyspace
                         proposal.update.id());
     }
 
-    private static int paxosTtl(CFMetaData metadata)
+    public static int paxosTtl(CFMetaData metadata)
     {
         // keep paxos state around for at least 3h
         return Math.max(3 * 3600, metadata.getGcGraceSeconds());

http://git-wip-us.apache.org/repos/asf/cassandra/blob/017ec3e9/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index af0693b..cddc7e9 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -421,7 +421,7 @@ public class StorageProxy implements StorageProxyMBean
             // https://issues.apache.org/jira/browse/CASSANDRA-5062?focusedCommentId=13619810&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13619810)
             // Since we waited for quorum nodes, if some of them haven't seen the last commit (which may just be a timing issue, but may also
             // mean we lost messages), we pro-actively "repair" those nodes, and retry.
-            Iterable<InetAddress> missingMRC = summary.replicasMissingMostRecentCommit();
+            Iterable<InetAddress> missingMRC = summary.replicasMissingMostRecentCommit(metadata, ballotMicros);
             if (Iterables.size(missingMRC) > 0)
             {
                 Tracing.trace("Repairing replicas that missed the most recent commit");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/017ec3e9/src/java/org/apache/cassandra/service/paxos/PaxosState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
index 01e03f4..fde881b 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
@@ -63,7 +63,13 @@ public class PaxosState
             lock.lock();
             try
             {
-                PaxosState state = SystemKeyspace.loadPaxosState(toPrepare.key, toPrepare.update.metadata());
+                // When preparing, we need to use the same time as "now" (that's the time we use to decide if something
+                // is expired or not) accross nodes otherwise we may have a window where a Most Recent Commit shows up
+                // on some replica and not others during a new proposal (in StorageProxy.beginAndRepairPaxos()), and no
+                // amount of re-submit will fix this (because the node on which the commit has expired will have a
+                // tombstone that hides any re-submit). See CASSANDRA-12043 for details.
+                long now = UUIDGen.unixTimestamp(toPrepare.ballot);
+                PaxosState state = SystemKeyspace.loadPaxosState(toPrepare.key, toPrepare.update.metadata(), now);
                 if (toPrepare.isAfter(state.promised))
                 {
                     Tracing.trace("Promising ballot {}", toPrepare.ballot);
@@ -98,7 +104,8 @@ public class PaxosState
             lock.lock();
             try
             {
-                PaxosState state = SystemKeyspace.loadPaxosState(proposal.key, proposal.update.metadata());
+                long now = UUIDGen.unixTimestamp(proposal.ballot);
+                PaxosState state = SystemKeyspace.loadPaxosState(proposal.key, proposal.update.metadata(), now);
                 if (proposal.hasBallot(state.promised.ballot) || proposal.isAfter(state.promised))
                 {
                     Tracing.trace("Accepting proposal {}", proposal);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/017ec3e9/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
index a446b0b..2859a69 100644
--- a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
+++ b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
@@ -23,6 +23,7 @@ package org.apache.cassandra.service.paxos;
 
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -33,7 +34,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.utils.UUIDGen;
 
 public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
 {
@@ -86,8 +89,21 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
         latch.countDown();
     }
 
-    public Iterable<InetAddress> replicasMissingMostRecentCommit()
+    public Iterable<InetAddress> replicasMissingMostRecentCommit(CFMetaData metadata, long now)
     {
+        // In general, we need every replicas that have answered to the prepare (a quorum) to agree on the MRC (see
+        // coment in StorageProxy.beginAndRepairPaxos(), but basically we need to make sure at least a quorum of nodes
+        // have learn a commit before commit a new one otherwise that previous commit is not guaranteed to have reach a
+        // quorum and further commit may proceed on incomplete information).
+        // However, if that commit is too hold, it may have been expired from some of the replicas paxos table (we don't
+        // keep the paxos state forever or that could grow unchecked), and we could end up in some infinite loop as
+        // explained on CASSANDRA-12043. To avoid that, we ignore a MRC that is too old, i.e. older than the TTL we set
+        // on paxos tables. For such old commit, we rely on hints and repair to ensure the commit has indeed be
+        // propagated to all nodes.
+        long paxosTtlMicros = SystemKeyspace.paxosTtl(metadata) * 1000 * 1000;
+        if (UUIDGen.microsTimestamp(mostRecentCommit.ballot) + paxosTtlMicros < now)
+            return Collections.emptySet();
+
         return Iterables.filter(commitsByReplica.keySet(), new Predicate<InetAddress>()
         {
             public boolean apply(InetAddress inetAddress)