You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by sl...@apache.org on 2016/06/28 13:22:06 UTC

[16/17] cassandra git commit: Avoid stalling Paxos when the paxos state expires

Avoid stalling Paxos when the paxos state expires

This commit does 2 things:
- It ignores MRCs that are old enough to have expired in some nodes paxos tables
- It ensures the same timestamp is used when reading the paxos state and ignoring old MRC

patch by slebresne; reviewed by jasobraown for CASSANDRA-12043


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/53f8f095
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/53f8f095
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/53f8f095

Branch: refs/heads/cassandra-3.9
Commit: 53f8f09575dfbdc166b257fe71a4d4a832470253
Parents: d289778
Author: Sylvain Lebresne <sy...@datastax.com>
Authored: Wed Jun 22 12:12:37 2016 +0200
Committer: Sylvain Lebresne <sy...@datastax.com>
Committed: Tue Jun 28 15:21:05 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                       |  1 +
 .../org/apache/cassandra/cql3/QueryProcessor.java | 17 ++++++++++++++++-
 .../cql3/statements/SelectStatement.java          |  6 +++++-
 .../org/apache/cassandra/db/SystemKeyspace.java   | 12 ++++++------
 .../apache/cassandra/service/StorageProxy.java    |  4 +++-
 .../cassandra/service/paxos/PaxosState.java       | 11 +++++++++--
 .../cassandra/service/paxos/PrepareCallback.java  | 18 +++++++++++++++++-
 src/java/org/apache/cassandra/utils/UUIDGen.java  | 12 ++++++++++++
 8 files changed, 69 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/53f8f095/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cd13896..075d44a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 3.9
 Merged from 2.1:
+ * Avoid stalling paxos when the paxos state expires (CASSANDRA-12043)
  * Remove finished incoming streaming connections from MessagingService (CASSANDRA-11854)
 
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53f8f095/src/java/org/apache/cassandra/cql3/QueryProcessor.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/QueryProcessor.java b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
index d812af4..222204b 100644
--- a/src/java/org/apache/cassandra/cql3/QueryProcessor.java
+++ b/src/java/org/apache/cassandra/cql3/QueryProcessor.java
@@ -258,7 +258,7 @@ public class QueryProcessor implements QueryHandler
         return QueryOptions.forInternalCalls(cl, boundValues);
     }
 
-    private static ParsedStatement.Prepared prepareInternal(String query) throws RequestValidationException
+    public static ParsedStatement.Prepared prepareInternal(String query) throws RequestValidationException
     {
         ParsedStatement.Prepared prepared = internalStatements.get(query);
         if (prepared != null)
@@ -331,6 +331,21 @@ public class QueryProcessor implements QueryHandler
             return null;
     }
 
+    /**
+     * A special version of executeInternal that takes the time used as "now" for the query in argument.
+     * Note that this only make sense for Selects so this only accept SELECT statements and is only useful in rare
+     * cases.
+     */
+    public static UntypedResultSet executeInternalWithNow(int nowInSec, String query, Object... values)
+    {
+        ParsedStatement.Prepared prepared = prepareInternal(query);
+        assert prepared.statement instanceof SelectStatement;
+        SelectStatement select = (SelectStatement)prepared.statement;
+        ResultMessage result = select.executeInternal(internalQueryState(), makeInternalOptions(prepared, values), nowInSec);
+        assert result instanceof ResultMessage.Rows;
+        return UntypedResultSet.create(((ResultMessage.Rows)result).result);
+    }
+
     public static UntypedResultSet resultify(String query, RowIterator partition)
     {
         return resultify(query, PartitionIterators.singletonIterator(partition));

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53f8f095/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
index 5f37e5e..f2b484e 100644
--- a/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
+++ b/src/java/org/apache/cassandra/cql3/statements/SelectStatement.java
@@ -414,7 +414,11 @@ public class SelectStatement implements CQLStatement
 
     public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options) throws RequestExecutionException, RequestValidationException
     {
-        int nowInSec = FBUtilities.nowInSeconds();
+        return executeInternal(state, options, FBUtilities.nowInSeconds());
+    }
+
+    public ResultMessage.Rows executeInternal(QueryState state, QueryOptions options, int nowInSec) throws RequestExecutionException, RequestValidationException
+    {
         int userLimit = getLimit(options);
         int userPerPartitionLimit = getPerPartitionLimit(options);
         ReadQuery query = getQuery(options, nowInSec, userLimit, userPerPartitionLimit);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53f8f095/src/java/org/apache/cassandra/db/SystemKeyspace.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java
index 1203834..584279d 100644
--- a/src/java/org/apache/cassandra/db/SystemKeyspace.java
+++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -1103,10 +1103,10 @@ public final class SystemKeyspace
         return null;
     }
 
-    public static PaxosState loadPaxosState(DecoratedKey key, CFMetaData metadata)
+    public static PaxosState loadPaxosState(DecoratedKey key, CFMetaData metadata, int nowInSec)
     {
         String req = "SELECT * FROM system.%s WHERE row_key = ? AND cf_id = ?";
-        UntypedResultSet results = executeInternal(String.format(req, PAXOS), key.getKey(), metadata.cfId);
+        UntypedResultSet results = QueryProcessor.executeInternalWithNow(nowInSec, String.format(req, PAXOS), key.getKey(), metadata.cfId);
         if (results.isEmpty())
             return new PaxosState(key, metadata);
         UntypedResultSet.Row row = results.one();
@@ -1131,7 +1131,7 @@ public final class SystemKeyspace
         String req = "UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET in_progress_ballot = ? WHERE row_key = ? AND cf_id = ?";
         executeInternal(String.format(req, PAXOS),
                         UUIDGen.microsTimestamp(promise.ballot),
-                        paxosTtl(promise.update.metadata()),
+                        paxosTtlSec(promise.update.metadata()),
                         promise.ballot,
                         promise.update.partitionKey().getKey(),
                         promise.update.metadata().cfId);
@@ -1141,7 +1141,7 @@ public final class SystemKeyspace
     {
         executeInternal(String.format("UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = ?, proposal = ?, proposal_version = ? WHERE row_key = ? AND cf_id = ?", PAXOS),
                         UUIDGen.microsTimestamp(proposal.ballot),
-                        paxosTtl(proposal.update.metadata()),
+                        paxosTtlSec(proposal.update.metadata()),
                         proposal.ballot,
                         PartitionUpdate.toBytes(proposal.update, MessagingService.current_version),
                         MessagingService.current_version,
@@ -1149,7 +1149,7 @@ public final class SystemKeyspace
                         proposal.update.metadata().cfId);
     }
 
-    private static int paxosTtl(CFMetaData metadata)
+    public static int paxosTtlSec(CFMetaData metadata)
     {
         // keep paxos state around for at least 3h
         return Math.max(3 * 3600, metadata.params.gcGraceSeconds);
@@ -1162,7 +1162,7 @@ public final class SystemKeyspace
         String cql = "UPDATE system.%s USING TIMESTAMP ? AND TTL ? SET proposal_ballot = null, proposal = null, most_recent_commit_at = ?, most_recent_commit = ?, most_recent_commit_version = ? WHERE row_key = ? AND cf_id = ?";
         executeInternal(String.format(cql, PAXOS),
                         UUIDGen.microsTimestamp(commit.ballot),
-                        paxosTtl(commit.update.metadata()),
+                        paxosTtlSec(commit.update.metadata()),
                         commit.ballot,
                         PartitionUpdate.toBytes(commit.update, MessagingService.current_version),
                         MessagingService.current_version,

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53f8f095/src/java/org/apache/cassandra/service/StorageProxy.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageProxy.java b/src/java/org/apache/cassandra/service/StorageProxy.java
index 90c246e..c88c449 100644
--- a/src/java/org/apache/cassandra/service/StorageProxy.java
+++ b/src/java/org/apache/cassandra/service/StorageProxy.java
@@ -31,6 +31,7 @@ import javax.management.ObjectName;
 import com.google.common.base.Predicate;
 import com.google.common.cache.CacheLoader;
 import com.google.common.collect.*;
+import com.google.common.primitives.Ints;
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
@@ -440,7 +441,8 @@ public class StorageProxy implements StorageProxyMBean
             // https://issues.apache.org/jira/browse/CASSANDRA-5062?focusedCommentId=13619810&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13619810)
             // Since we waited for quorum nodes, if some of them haven't seen the last commit (which may just be a timing issue, but may also
             // mean we lost messages), we pro-actively "repair" those nodes, and retry.
-            Iterable<InetAddress> missingMRC = summary.replicasMissingMostRecentCommit();
+            int nowInSec = Ints.checkedCast(TimeUnit.MICROSECONDS.toSeconds(ballotMicros));
+            Iterable<InetAddress> missingMRC = summary.replicasMissingMostRecentCommit(metadata, nowInSec);
             if (Iterables.size(missingMRC) > 0)
             {
                 Tracing.trace("Repairing replicas that missed the most recent commit");

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53f8f095/src/java/org/apache/cassandra/service/paxos/PaxosState.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PaxosState.java b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
index 0b3af8f..e01f568 100644
--- a/src/java/org/apache/cassandra/service/paxos/PaxosState.java
+++ b/src/java/org/apache/cassandra/service/paxos/PaxosState.java
@@ -65,7 +65,13 @@ public class PaxosState
             lock.lock();
             try
             {
-                PaxosState state = SystemKeyspace.loadPaxosState(toPrepare.update.partitionKey(), toPrepare.update.metadata());
+                // When preparing, we need to use the same time as "now" (that's the time we use to decide if something
+                // is expired or not) accross nodes otherwise we may have a window where a Most Recent Commit shows up
+                // on some replica and not others during a new proposal (in StorageProxy.beginAndRepairPaxos()), and no
+                // amount of re-submit will fix this (because the node on which the commit has expired will have a
+                // tombstone that hides any re-submit). See CASSANDRA-12043 for details.
+                int nowInSec = UUIDGen.unixTimestampInSec(toPrepare.ballot);
+                PaxosState state = SystemKeyspace.loadPaxosState(toPrepare.update.partitionKey(), toPrepare.update.metadata(), nowInSec);
                 if (toPrepare.isAfter(state.promised))
                 {
                     Tracing.trace("Promising ballot {}", toPrepare.ballot);
@@ -100,7 +106,8 @@ public class PaxosState
             lock.lock();
             try
             {
-                PaxosState state = SystemKeyspace.loadPaxosState(proposal.update.partitionKey(), proposal.update.metadata());
+                int nowInSec = UUIDGen.unixTimestampInSec(proposal.ballot);
+                PaxosState state = SystemKeyspace.loadPaxosState(proposal.update.partitionKey(), proposal.update.metadata(), nowInSec);
                 if (proposal.hasBallot(state.promised.ballot) || proposal.isAfter(state.promised))
                 {
                     Tracing.trace("Accepting proposal {}", proposal);

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53f8f095/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
index 9c54b01..ff81803 100644
--- a/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
+++ b/src/java/org/apache/cassandra/service/paxos/PrepareCallback.java
@@ -23,6 +23,7 @@ package org.apache.cassandra.service.paxos;
 
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -34,7 +35,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.config.CFMetaData;
+import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.net.MessageIn;
+import org.apache.cassandra.utils.UUIDGen;
 
 public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
 {
@@ -87,8 +90,21 @@ public class PrepareCallback extends AbstractPaxosCallback<PrepareResponse>
         latch.countDown();
     }
 
-    public Iterable<InetAddress> replicasMissingMostRecentCommit()
+    public Iterable<InetAddress> replicasMissingMostRecentCommit(CFMetaData metadata, int nowInSec)
     {
+        // In general, we need every replicas that have answered to the prepare (a quorum) to agree on the MRC (see
+        // coment in StorageProxy.beginAndRepairPaxos(), but basically we need to make sure at least a quorum of nodes
+        // have learn a commit before commit a new one otherwise that previous commit is not guaranteed to have reach a
+        // quorum and further commit may proceed on incomplete information).
+        // However, if that commit is too hold, it may have been expired from some of the replicas paxos table (we don't
+        // keep the paxos state forever or that could grow unchecked), and we could end up in some infinite loop as
+        // explained on CASSANDRA-12043. To avoid that, we ignore a MRC that is too old, i.e. older than the TTL we set
+        // on paxos tables. For such old commit, we rely on hints and repair to ensure the commit has indeed be
+        // propagated to all nodes.
+        long paxosTtlSec = SystemKeyspace.paxosTtlSec(metadata);
+        if (UUIDGen.unixTimestampInSec(mostRecentCommit.ballot) + paxosTtlSec < nowInSec)
+            return Collections.emptySet();
+
         return Iterables.filter(commitsByReplica.keySet(), new Predicate<InetAddress>()
         {
             public boolean apply(InetAddress inetAddress)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/53f8f095/src/java/org/apache/cassandra/utils/UUIDGen.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/utils/UUIDGen.java b/src/java/org/apache/cassandra/utils/UUIDGen.java
index 00efbe3..a8b3093 100644
--- a/src/java/org/apache/cassandra/utils/UUIDGen.java
+++ b/src/java/org/apache/cassandra/utils/UUIDGen.java
@@ -26,8 +26,11 @@ import java.util.Collection;
 import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Charsets;
+import com.google.common.primitives.Ints;
 
 /**
  * The goods are here: www.ietf.org/rfc/rfc4122.txt.
@@ -211,6 +214,15 @@ public class UUIDGen
 
     /**
      * @param uuid
+     * @return seconds since Unix epoch
+     */
+    public static int unixTimestampInSec(UUID uuid)
+    {
+        return Ints.checkedCast(TimeUnit.MILLISECONDS.toSeconds(unixTimestamp(uuid)));
+    }
+
+    /**
+     * @param uuid
      * @return microseconds since Unix epoch
      */
     public static long microsTimestamp(UUID uuid)