You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2015/12/02 15:56:19 UTC

[05/15] cassandra git commit: Fix incremental repair hang when replica is down

Fix incremental repair hang when replica is down

patch by yukim; reviewed by marcuse for CASSANDRA-10288


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1538c092
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1538c092
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1538c092

Branch: refs/heads/cassandra-3.1
Commit: 1538c0921444d7969ebd07ca1abda9a7e40e4c73
Parents: 0b26ca6
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Dec 2 08:41:11 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Dec 2 08:41:11 2015 -0600

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../cassandra/repair/AnticompactionTask.java    | 29 +++++++++++++-------
 .../cassandra/service/ActiveRepairService.java  | 17 +++++++++---
 3 files changed, 33 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/1538c092/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e00abfe..9c5e2d5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.12
+ * Fix incremental repair hang when replica is down (CASSANDRA-10288)
  * Avoid writing range tombstones after END_OF_ROW marker (CASSANDRA-10791)
  * Optimize the way we check if a token is repaired in anticompaction (CASSANDRA-10768)
  * Add proper error handling to stream receiver (CASSANDRA-10774)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1538c092/src/java/org/apache/cassandra/repair/AnticompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AnticompactionTask.java b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
index f41d26c..8b68fd3 100644
--- a/src/java/org/apache/cassandra/repair/AnticompactionTask.java
+++ b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
@@ -17,6 +17,7 @@
  */
 package org.apache.cassandra.repair;
 
+import java.io.IOException;
 import java.net.InetAddress;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
@@ -24,6 +25,7 @@ import java.util.concurrent.TimeUnit;
 import com.google.common.util.concurrent.AbstractFuture;
 
 import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.net.IAsyncCallbackWithFailure;
 import org.apache.cassandra.net.MessageIn;
 import org.apache.cassandra.net.MessagingService;
@@ -52,25 +54,32 @@ public class AnticompactionTask extends AbstractFuture<InetAddress> implements R
 
     public void run()
     {
-        AnticompactionRequest acr = new AnticompactionRequest(parentSession);
-        SemanticVersion peerVersion = SystemKeyspace.getReleaseVersion(neighbor);
-        if (peerVersion != null && peerVersion.compareTo(VERSION_CHECKER) > 0)
+        if (FailureDetector.instance.isAlive(neighbor))
         {
-            if (doAnticompaction)
+            AnticompactionRequest acr = new AnticompactionRequest(parentSession);
+            SemanticVersion peerVersion = SystemKeyspace.getReleaseVersion(neighbor);
+            if (peerVersion != null && peerVersion.compareTo(VERSION_CHECKER) > 0)
             {
-                MessagingService.instance().sendRR(acr.createMessage(), neighbor, new AnticompactionCallback(this), TimeUnit.DAYS.toMillis(1), true);
+                if (doAnticompaction)
+                {
+                    MessagingService.instance().sendRR(acr.createMessage(), neighbor, new AnticompactionCallback(this), TimeUnit.DAYS.toMillis(1), true);
+                }
+                else
+                {
+                    // we need to clean up parent session
+                    MessagingService.instance().sendRR(new CleanupMessage(parentSession).createMessage(), neighbor, new AnticompactionCallback(this), TimeUnit.DAYS.toMillis(1), true);
+                }
             }
             else
             {
-                // we need to clean up parent session
-                MessagingService.instance().sendRR(new CleanupMessage(parentSession).createMessage(), neighbor, new AnticompactionCallback(this), TimeUnit.DAYS.toMillis(1), true);
+                MessagingService.instance().sendOneWay(acr.createMessage(), neighbor);
+                // immediately return after sending request
+                set(neighbor);
             }
         }
         else
         {
-            MessagingService.instance().sendOneWay(acr.createMessage(), neighbor);
-            // immediately return after sending request
-            set(neighbor);
+            setException(new IOException(neighbor + " is down"));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/1538c092/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 4266f41..dd80d4c 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -277,11 +277,20 @@ public class ActiveRepairService
         for (ColumnFamilyStore cfs : columnFamilyStores)
             cfIds.add(cfs.metadata.cfId);
 
-        for(InetAddress neighbour : endpoints)
+        for (InetAddress neighbour : endpoints)
         {
-            PrepareMessage message = new PrepareMessage(parentRepairSession, cfIds, ranges);
-            MessageOut<RepairMessage> msg = message.createMessage();
-            MessagingService.instance().sendRR(msg, neighbour, callback, TimeUnit.HOURS.toMillis(1), true);
+            if (FailureDetector.instance.isAlive(neighbour))
+            {
+                PrepareMessage message = new PrepareMessage(parentRepairSession, cfIds, ranges);
+                MessageOut<RepairMessage> msg = message.createMessage();
+                MessagingService.instance().sendRR(msg, neighbour, callback, TimeUnit.HOURS.toMillis(1), true);
+            }
+            else
+            {
+                status.set(false);
+                failedNodes.add(neighbour.getHostAddress());
+                prepareLatch.countDown();
+            }
         }
         try
         {