You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2016/11/16 23:27:08 UTC

[04/10] cassandra git commit: Fail repair if participant dies during sync or anticompaction

Fail repair if participant dies during sync or anticompaction

Patch by Paulo Motta; Reviewed by Yuki Morishita for CASSANDRA-12901

This reverts the behavior of repair relying exclusively on TCP keep-alive to
detect node failures during sync introduced by CASSANDRA-3569.


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/84b9e727
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/84b9e727
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/84b9e727

Branch: refs/heads/trunk
Commit: 84b9e72788816b571cb50404ccb9fb9b5f19ba4f
Parents: ebf3507
Author: Paulo Motta <pa...@gmail.com>
Authored: Mon Nov 14 18:56:16 2016 -0200
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Nov 16 15:20:46 2016 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/repair/AnticompactionTask.java    | 80 ++++++++++++++++++--
 .../apache/cassandra/repair/RepairSession.java  | 13 ----
 .../cassandra/service/ActiveRepairService.java  | 30 ++++++--
 4 files changed, 99 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 98c1839..3482052 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.2.9
+ * Fail repair if participant dies during sync or anticompaction (CASSANDRA-12901)
  * cqlsh COPY: unprotected pk values before converting them if not using prepared statements (CASSANDRA-12863)
  * Fix Util.spinAssertEquals (CASSANDRA-12283)
  * Fix potential NPE for compactionstats (CASSANDRA-12462)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/src/java/org/apache/cassandra/repair/AnticompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AnticompactionTask.java b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
index 8ecae23..c5e066d 100644
--- a/src/java/org/apache/cassandra/repair/AnticompactionTask.java
+++ b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
@@ -22,30 +22,43 @@ import java.net.InetAddress;
 import java.util.Collection;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.util.concurrent.AbstractFuture;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.SystemKeyspace;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
 import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
+import org.apache.cassandra.gms.IFailureDetectionEventListener;
+import org.apache.cassandra.gms.VersionedValue;
 import org.apache.cassandra.net.IAsyncCallbackWithFailure;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
 import org.apache.cassandra.repair.messages.AnticompactionRequest;
 import org.apache.cassandra.utils.CassandraVersion;
 
-public class AnticompactionTask extends AbstractFuture<InetAddress> implements Runnable
+public class AnticompactionTask extends AbstractFuture<InetAddress> implements Runnable, IEndpointStateChangeSubscriber,
+                                                                               IFailureDetectionEventListener
 {
     /*
      * Version that anticompaction response is not supported up to.
      * If Cassandra version is more than this, we need to wait for anticompaction response.
      */
     private static final CassandraVersion VERSION_CHECKER = new CassandraVersion("2.1.5");
+    private static Logger logger = LoggerFactory.getLogger(RepairSession.class);
 
     private final UUID parentSession;
     private final InetAddress neighbor;
     private final Collection<Range<Token>> successfulRanges;
+    private final AtomicBoolean isFinished = new AtomicBoolean(false);
 
     public AnticompactionTask(UUID parentSession, InetAddress neighbor, Collection<Range<Token>> successfulRanges)
     {
@@ -66,21 +79,41 @@ public class AnticompactionTask extends AbstractFuture<InetAddress> implements R
             }
             else
             {
-                MessagingService.instance().sendOneWay(acr.createMessage(), neighbor);
                 // immediately return after sending request
-                set(neighbor);
+                MessagingService.instance().sendOneWay(acr.createMessage(), neighbor);
+                maybeSetResult(neighbor);
             }
         }
         else
         {
-            setException(new IOException(neighbor + " is down"));
+            maybeSetException(new IOException(neighbor + " is down"));
+        }
+    }
+
+    private boolean maybeSetException(Throwable t)
+    {
+        if (isFinished.compareAndSet(false, true))
+        {
+            setException(t);
+            return true;
         }
+        return false;
+    }
+
+    private boolean maybeSetResult(InetAddress o)
+    {
+        if (isFinished.compareAndSet(false, true))
+        {
+            set(o);
+            return true;
+        }
+        return false;
     }
 
     /**
      * Callback for antitcompaction request. Run on INTERNAL_RESPONSE stage.
      */
-    public static class AnticompactionCallback implements IAsyncCallbackWithFailure
+    public class AnticompactionCallback implements IAsyncCallbackWithFailure
     {
         final AnticompactionTask task;
 
@@ -91,7 +124,7 @@ public class AnticompactionTask extends AbstractFuture<InetAddress> implements R
 
         public void response(MessageIn msg)
         {
-            task.set(msg.from);
+            maybeSetResult(msg.from);
         }
 
         public boolean isLatencyForSnitch()
@@ -101,7 +134,40 @@ public class AnticompactionTask extends AbstractFuture<InetAddress> implements R
 
         public void onFailure(InetAddress from)
         {
-            task.setException(new RuntimeException("Anticompaction failed or timed out in " + from));
+            maybeSetException(new RuntimeException("Anticompaction failed or timed out in " + from));
+        }
+    }
+
+    public void onJoin(InetAddress endpoint, EndpointState epState) {}
+    public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
+    public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}
+    public void onAlive(InetAddress endpoint, EndpointState state) {}
+    public void onDead(InetAddress endpoint, EndpointState state) {}
+
+    public void onRemove(InetAddress endpoint)
+    {
+        convict(endpoint, Double.MAX_VALUE);
+    }
+
+    public void onRestart(InetAddress endpoint, EndpointState epState)
+    {
+        convict(endpoint, Double.MAX_VALUE);
+    }
+
+    public void convict(InetAddress endpoint, double phi)
+    {
+        if (!neighbor.equals(endpoint))
+            return;
+
+        // We want a higher confidence in the failure detection than usual because failing a repair wrongly has a high cost.
+        if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
+            return;
+
+        Exception exception = new IOException(String.format("Endpoint %s died during anti-compaction.", endpoint));
+        if (maybeSetException(exception))
+        {
+            // Though unlikely, it is possible to arrive here multiple time and we want to avoid print an error message twice
+            logger.error("[repair #{}] Endpoint {} died during anti-compaction", endpoint, parentSession, exception);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index a2dcdd1..70bfaa6 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -23,7 +23,6 @@ import java.util.*;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.*;
@@ -92,9 +91,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
     public final Set<InetAddress> endpoints;
     private final long repairedAt;
 
-    // number of validations left to be performed
-    private final AtomicInteger validationRemaining;
-
     private final AtomicBoolean isFailed = new AtomicBoolean(false);
 
     // Each validation task waits response from replica in validating ConcurrentMap (keyed by CF name and endpoint address)
@@ -138,7 +134,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
         this.range = range;
         this.endpoints = endpoints;
         this.repairedAt = repairedAt;
-        this.validationRemaining = new AtomicInteger(cfnames.length);
     }
 
     public UUID getId()
@@ -181,14 +176,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
         logger.info("[repair #{}] {}", getId(), message);
         Tracing.traceRepair(message);
         task.treeReceived(tree);
-
-        // Unregister from FailureDetector once we've completed synchronizing Merkle trees.
-        // After this point, we rely on tcp_keepalive for individual sockets to notify us when a connection is down.
-        // See CASSANDRA-3569
-        if (validationRemaining.decrementAndGet() == 0)
-        {
-            FailureDetector.instance.unregisterFailureDetectionEventListener(this);
-        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 38804b3..7d56e4b 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -29,6 +29,7 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.AbstractFuture;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
@@ -147,10 +148,9 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
 
         sessions.put(session.getId(), session);
         // register listeners
-        gossiper.register(session);
-        failureDetector.registerFailureDetectionEventListener(session);
+        registerOnFdAndGossip(session);
 
-        // unregister listeners at completion
+        // remove session at completion
         session.addListener(new Runnable()
         {
             /**
@@ -158,8 +158,6 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
              */
             public void run()
             {
-                failureDetector.unregisterFailureDetectionEventListener(session);
-                gossiper.unregister(session);
                 sessions.remove(session.getId());
             }
         }, MoreExecutors.sameThreadExecutor());
@@ -167,6 +165,27 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
         return session;
     }
 
+    private <T extends AbstractFuture &
+               IEndpointStateChangeSubscriber &
+               IFailureDetectionEventListener> void registerOnFdAndGossip(final T task)
+    {
+        gossiper.register(task);
+        failureDetector.registerFailureDetectionEventListener(task);
+
+        // unregister listeners at completion
+        task.addListener(new Runnable()
+        {
+            /**
+             * When repair finished, do clean up
+             */
+            public void run()
+            {
+                failureDetector.unregisterFailureDetectionEventListener(task);
+                gossiper.unregister(task);
+            }
+        }, MoreExecutors.sameThreadExecutor());
+    }
+
     public synchronized void terminateSessions()
     {
         Throwable cause = new IOException("Terminate session is called");
@@ -362,6 +381,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
         for (InetAddress neighbor : neighbors)
         {
             AnticompactionTask task = new AnticompactionTask(parentSession, neighbor, successfulRanges);
+            registerOnFdAndGossip(task);
             tasks.add(task);
             task.run(); // 'run' is just sending message
         }