You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by yu...@apache.org on 2016/11/16 23:27:08 UTC
[04/10] cassandra git commit: Fail repair if participant dies during
sync or anticompaction
Fail repair if participant dies during sync or anticompaction
Patch by Paulo Motta; Reviewed by Yuki Morishita for CASSANDRA-12901
This reverts the behavior of repair relying exclusively on TCP keep-alive to
detect node failures during sync introduced by CASSANDRA-3569.
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/84b9e727
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/84b9e727
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/84b9e727
Branch: refs/heads/trunk
Commit: 84b9e72788816b571cb50404ccb9fb9b5f19ba4f
Parents: ebf3507
Author: Paulo Motta <pa...@gmail.com>
Authored: Mon Nov 14 18:56:16 2016 -0200
Committer: Yuki Morishita <yu...@apache.org>
Committed: Wed Nov 16 15:20:46 2016 -0600
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../cassandra/repair/AnticompactionTask.java | 80 ++++++++++++++++++--
.../apache/cassandra/repair/RepairSession.java | 13 ----
.../cassandra/service/ActiveRepairService.java | 30 ++++++--
4 files changed, 99 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 98c1839..3482052 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.2.9
+ * Fail repair if participant dies during sync or anticompaction (CASSANDRA-12901)
* cqlsh COPY: unprotected pk values before converting them if not using prepared statements (CASSANDRA-12863)
* Fix Util.spinAssertEquals (CASSANDRA-12283)
* Fix potential NPE for compactionstats (CASSANDRA-12462)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/src/java/org/apache/cassandra/repair/AnticompactionTask.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/AnticompactionTask.java b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
index 8ecae23..c5e066d 100644
--- a/src/java/org/apache/cassandra/repair/AnticompactionTask.java
+++ b/src/java/org/apache/cassandra/repair/AnticompactionTask.java
@@ -22,30 +22,43 @@ import java.net.InetAddress;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.util.concurrent.AbstractFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
+import org.apache.cassandra.gms.IFailureDetectionEventListener;
+import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.net.IAsyncCallbackWithFailure;
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.repair.messages.AnticompactionRequest;
import org.apache.cassandra.utils.CassandraVersion;
-public class AnticompactionTask extends AbstractFuture<InetAddress> implements Runnable
+public class AnticompactionTask extends AbstractFuture<InetAddress> implements Runnable, IEndpointStateChangeSubscriber,
+ IFailureDetectionEventListener
{
/*
* Version that anticompaction response is not supported up to.
* If Cassandra version is more than this, we need to wait for anticompaction response.
*/
private static final CassandraVersion VERSION_CHECKER = new CassandraVersion("2.1.5");
+ private static Logger logger = LoggerFactory.getLogger(RepairSession.class);
private final UUID parentSession;
private final InetAddress neighbor;
private final Collection<Range<Token>> successfulRanges;
+ private final AtomicBoolean isFinished = new AtomicBoolean(false);
public AnticompactionTask(UUID parentSession, InetAddress neighbor, Collection<Range<Token>> successfulRanges)
{
@@ -66,21 +79,41 @@ public class AnticompactionTask extends AbstractFuture<InetAddress> implements R
}
else
{
- MessagingService.instance().sendOneWay(acr.createMessage(), neighbor);
// immediately return after sending request
- set(neighbor);
+ MessagingService.instance().sendOneWay(acr.createMessage(), neighbor);
+ maybeSetResult(neighbor);
}
}
else
{
- setException(new IOException(neighbor + " is down"));
+ maybeSetException(new IOException(neighbor + " is down"));
+ }
+ }
+
+ private boolean maybeSetException(Throwable t)
+ {
+ if (isFinished.compareAndSet(false, true))
+ {
+ setException(t);
+ return true;
}
+ return false;
+ }
+
+ private boolean maybeSetResult(InetAddress o)
+ {
+ if (isFinished.compareAndSet(false, true))
+ {
+ set(o);
+ return true;
+ }
+ return false;
}
/**
* Callback for antitcompaction request. Run on INTERNAL_RESPONSE stage.
*/
- public static class AnticompactionCallback implements IAsyncCallbackWithFailure
+ public class AnticompactionCallback implements IAsyncCallbackWithFailure
{
final AnticompactionTask task;
@@ -91,7 +124,7 @@ public class AnticompactionTask extends AbstractFuture<InetAddress> implements R
public void response(MessageIn msg)
{
- task.set(msg.from);
+ maybeSetResult(msg.from);
}
public boolean isLatencyForSnitch()
@@ -101,7 +134,40 @@ public class AnticompactionTask extends AbstractFuture<InetAddress> implements R
public void onFailure(InetAddress from)
{
- task.setException(new RuntimeException("Anticompaction failed or timed out in " + from));
+ maybeSetException(new RuntimeException("Anticompaction failed or timed out in " + from));
+ }
+ }
+
+ public void onJoin(InetAddress endpoint, EndpointState epState) {}
+ public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
+ public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}
+ public void onAlive(InetAddress endpoint, EndpointState state) {}
+ public void onDead(InetAddress endpoint, EndpointState state) {}
+
+ public void onRemove(InetAddress endpoint)
+ {
+ convict(endpoint, Double.MAX_VALUE);
+ }
+
+ public void onRestart(InetAddress endpoint, EndpointState epState)
+ {
+ convict(endpoint, Double.MAX_VALUE);
+ }
+
+ public void convict(InetAddress endpoint, double phi)
+ {
+ if (!neighbor.equals(endpoint))
+ return;
+
+ // We want a higher confidence in the failure detection than usual because failing a repair wrongly has a high cost.
+ if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold())
+ return;
+
+ Exception exception = new IOException(String.format("Endpoint %s died during anti-compaction.", endpoint));
+ if (maybeSetException(exception))
+ {
+ // Though unlikely, it is possible to arrive here multiple time and we want to avoid print an error message twice
+ logger.error("[repair #{}] Endpoint {} died during anti-compaction", endpoint, parentSession, exception);
}
}
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/src/java/org/apache/cassandra/repair/RepairSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairSession.java b/src/java/org/apache/cassandra/repair/RepairSession.java
index a2dcdd1..70bfaa6 100644
--- a/src/java/org/apache/cassandra/repair/RepairSession.java
+++ b/src/java/org/apache/cassandra/repair/RepairSession.java
@@ -23,7 +23,6 @@ import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.*;
@@ -92,9 +91,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
public final Set<InetAddress> endpoints;
private final long repairedAt;
- // number of validations left to be performed
- private final AtomicInteger validationRemaining;
-
private final AtomicBoolean isFailed = new AtomicBoolean(false);
// Each validation task waits response from replica in validating ConcurrentMap (keyed by CF name and endpoint address)
@@ -138,7 +134,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
this.range = range;
this.endpoints = endpoints;
this.repairedAt = repairedAt;
- this.validationRemaining = new AtomicInteger(cfnames.length);
}
public UUID getId()
@@ -181,14 +176,6 @@ public class RepairSession extends AbstractFuture<RepairSessionResult> implement
logger.info("[repair #{}] {}", getId(), message);
Tracing.traceRepair(message);
task.treeReceived(tree);
-
- // Unregister from FailureDetector once we've completed synchronizing Merkle trees.
- // After this point, we rely on tcp_keepalive for individual sockets to notify us when a connection is down.
- // See CASSANDRA-3569
- if (validationRemaining.decrementAndGet() == 0)
- {
- FailureDetector.instance.unregisterFailureDetectionEventListener(this);
- }
}
/**
http://git-wip-us.apache.org/repos/asf/cassandra/blob/84b9e727/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 38804b3..7d56e4b 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -29,6 +29,7 @@ import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
@@ -147,10 +148,9 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
sessions.put(session.getId(), session);
// register listeners
- gossiper.register(session);
- failureDetector.registerFailureDetectionEventListener(session);
+ registerOnFdAndGossip(session);
- // unregister listeners at completion
+ // remove session at completion
session.addListener(new Runnable()
{
/**
@@ -158,8 +158,6 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
*/
public void run()
{
- failureDetector.unregisterFailureDetectionEventListener(session);
- gossiper.unregister(session);
sessions.remove(session.getId());
}
}, MoreExecutors.sameThreadExecutor());
@@ -167,6 +165,27 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
return session;
}
+ private <T extends AbstractFuture &
+ IEndpointStateChangeSubscriber &
+ IFailureDetectionEventListener> void registerOnFdAndGossip(final T task)
+ {
+ gossiper.register(task);
+ failureDetector.registerFailureDetectionEventListener(task);
+
+ // unregister listeners at completion
+ task.addListener(new Runnable()
+ {
+ /**
+ * When repair finished, do clean up
+ */
+ public void run()
+ {
+ failureDetector.unregisterFailureDetectionEventListener(task);
+ gossiper.unregister(task);
+ }
+ }, MoreExecutors.sameThreadExecutor());
+ }
+
public synchronized void terminateSessions()
{
Throwable cause = new IOException("Terminate session is called");
@@ -362,6 +381,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
for (InetAddress neighbor : neighbors)
{
AnticompactionTask task = new AnticompactionTask(parentSession, neighbor, successfulRanges);
+ registerOnFdAndGossip(task);
tasks.add(task);
task.run(); // 'run' is just sending message
}