You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2015/12/02 15:56:17 UTC
[03/15] cassandra git commit: Fix incremental repair hang when
replica is down
Fix incremental repair hang when replica is down
patch by yukim; reviewed by marcuse for CASSANDRA-10288
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/1538c092
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/1538c092
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/1538c092
Branch: refs/heads/trunk
Commit: 1538c0921444d7969ebd07ca1abda9a7e40e4c73
Parents: 0b26ca6
Author: Yuki Morishita <yu...@apache.org>
Authored: Wed Dec 2 08:41:11 2015 -0600
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Dec 2 08:41:11 2015 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/repair/AnticompactionTask.java | 29 +++++++++++++-------
.../cassandra/service/ActiveRepairService.java | 17 +++++++++---
3 files changed, 33 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1538c092/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index e00abfe..9c5e2d5 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.12
+ * Fix incremental repair hang when replica is down (CASSANDRA-10288)
* Avoid writing range tombstones after END_OF_ROW marker (CASSANDRA-10791)
* Optimize the way we check if a token is repaired in anticompaction (CASSANDRA-10768)
* Add proper error handling to stream receiver (CASSANDRA-10774)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1538c092/src/java/org/apache/cassandra/repair/AnticompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AnticompactionTask.java b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
index f41d26c..8b68fd3 100644
--- a/src/java/org/apache/cassandra/repair/AnticompactionTask.java
+++ b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
@@ -17,6 +17,7 @@
*/
package org.apache.cassandra.repair;
+import java.io.IOException;
import java.net.InetAddress;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -24,6 +25,7 @@ import java.util.concurrent.TimeUnit;
import com.google.common.util.concurrent.AbstractFuture;
import org.apache.cassandra.db.SystemKeyspace;
+import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.net.IAsyncCallbackWithFailure;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
@@ -52,25 +54,32 @@ public class AnticompactionTask extends AbstractFuture<InetAddress> implements R
public void run()
{
- AnticompactionRequest acr = new AnticompactionRequest(parentSession);
- SemanticVersion peerVersion = SystemKeyspace.getReleaseVersion(neighbor);
- if (peerVersion != null && peerVersion.compareTo(VERSION_CHECKER) > 0)
+ if (FailureDetector.instance.isAlive(neighbor))
{
- if (doAnticompaction)
+ AnticompactionRequest acr = new AnticompactionRequest(parentSession);
+ SemanticVersion peerVersion = SystemKeyspace.getReleaseVersion(neighbor);
+ if (peerVersion != null && peerVersion.compareTo(VERSION_CHECKER) > 0)
{
- MessagingService.instance().sendRR(acr.createMessage(), neighbor, new AnticompactionCallback(this), TimeUnit.DAYS.toMillis(1), true);
+ if (doAnticompaction)
+ {
+ MessagingService.instance().sendRR(acr.createMessage(), neighbor, new AnticompactionCallback(this), TimeUnit.DAYS.toMillis(1), true);
+ }
+ else
+ {
+ // we need to clean up parent session
+ MessagingService.instance().sendRR(new CleanupMessage(parentSession).createMessage(), neighbor, new AnticompactionCallback(this), TimeUnit.DAYS.toMillis(1), true);
+ }
}
else
{
- // we need to clean up parent session
- MessagingService.instance().sendRR(new CleanupMessage(parentSession).createMessage(), neighbor, new AnticompactionCallback(this), TimeUnit.DAYS.toMillis(1), true);
+ MessagingService.instance().sendOneWay(acr.createMessage(), neighbor);
+ // immediately return after sending request
+ set(neighbor);
}
}
else
{
- MessagingService.instance().sendOneWay(acr.createMessage(), neighbor);
- // immediately return after sending request
- set(neighbor);
+ setException(new IOException(neighbor + " is down"));
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/1538c092/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 4266f41..dd80d4c 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -277,11 +277,20 @@ public class ActiveRepairService
for (ColumnFamilyStore cfs : columnFamilyStores)
cfIds.add(cfs.metadata.cfId);
- for(InetAddress neighbour : endpoints)
+ for (InetAddress neighbour : endpoints)
{
- PrepareMessage message = new PrepareMessage(parentRepairSession, cfIds, ranges);
- MessageOut<RepairMessage> msg = message.createMessage();
- MessagingService.instance().sendRR(msg, neighbour, callback, TimeUnit.HOURS.toMillis(1), true);
+ if (FailureDetector.instance.isAlive(neighbour))
+ {
+ PrepareMessage message = new PrepareMessage(parentRepairSession, cfIds, ranges);
+ MessageOut<RepairMessage> msg = message.createMessage();
+ MessagingService.instance().sendRR(msg, neighbour, callback, TimeUnit.HOURS.toMillis(1), true);
+ }
+ else
+ {
+ status.set(false);
+ failedNodes.add(neighbour.getHostAddress());
+ prepareLatch.countDown();
+ }
}
try
{