You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2016/05/26 06:33:54 UTC
[03/15] cassandra git commit: Fail parent repair session if repair
coordinator dies
Fail parent repair session if repair coordinator dies
Patch by marcuse; reviewed by Paulo Motta for CASSANDRA-11824
Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/03180469
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/03180469
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/03180469
Branch: refs/heads/trunk
Commit: 03180469d62930ef73946458719223c8b9bec245
Parents: d27f9b0
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed May 18 10:44:49 2016 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Thu May 26 07:47:50 2016 +0200
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../repair/RepairMessageVerbHandler.java | 5 +-
.../cassandra/service/ActiveRepairService.java | 137 +++++++++++++++++--
.../cassandra/service/StorageService.java | 2 +-
.../LeveledCompactionStrategyTest.java | 2 +-
.../cassandra/repair/DifferencerTest.java | 3 +-
.../service/ActiveRepairServiceTest.java | 3 +-
7 files changed, 139 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03180469/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8dfa02a..f73db6e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
2.1.15
+ * Clear out parent repair session if repair coordinator dies (CASSANDRA-11824)
* Set default streaming_socket_timeout_in_ms to 24 hours (CASSANDRA-11840)
* Do not consider local node a valid source during replace (CASSANDRA-11848)
* Avoid holding SSTableReaders for duration of incremental repair (CASSANDRA-11739)
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03180469/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index fd4ac28..7debc93 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -71,8 +71,9 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
columnFamilyStores.add(columnFamilyStore);
}
ActiveRepairService.instance.registerParentRepairSession(prepareMessage.parentRepairSession,
- columnFamilyStores,
- prepareMessage.ranges);
+ message.from,
+ columnFamilyStores,
+ prepareMessage.ranges);
MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from);
break;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03180469/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 5297ce3..f8975f9 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -35,12 +35,18 @@ import org.slf4j.LoggerFactory;
import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
+import org.apache.cassandra.gms.IFailureDetectionEventListener;
+import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.net.IAsyncCallbackWithFailure;
@@ -72,7 +78,7 @@ import org.apache.cassandra.utils.concurrent.Refs;
* The creation of a repair session is done through the submitRepairSession that
* returns a future on the completion of that session.
*/
-public class ActiveRepairService
+public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener
{
private static final Logger logger = LoggerFactory.getLogger(ActiveRepairService.class);
// singleton enforcement
@@ -81,6 +87,8 @@ public class ActiveRepairService
public static final long UNREPAIRED_SSTABLE = 0;
private static final ThreadPoolExecutor executor;
+ private boolean registeredForEndpointChanges = false;
+
static
{
executor = new JMXConfigurableThreadPoolExecutor(4,
@@ -244,10 +252,10 @@ public class ActiveRepairService
return neighbors;
}
- public synchronized UUID prepareForRepair(Set<InetAddress> endpoints, Collection<Range<Token>> ranges, List<ColumnFamilyStore> columnFamilyStores)
+ public synchronized UUID prepareForRepair(InetAddress coordinator, Set<InetAddress> endpoints, Collection<Range<Token>> ranges, List<ColumnFamilyStore> columnFamilyStores)
{
UUID parentRepairSession = UUIDGen.getTimeUUID();
- registerParentRepairSession(parentRepairSession, columnFamilyStores, ranges);
+ registerParentRepairSession(parentRepairSession, coordinator, columnFamilyStores, ranges);
final CountDownLatch prepareLatch = new CountDownLatch(endpoints.size());
final AtomicBoolean status = new AtomicBoolean(true);
final Set<String> failedNodes = Collections.synchronizedSet(new HashSet<String>());
@@ -309,9 +317,36 @@ public class ActiveRepairService
return parentRepairSession;
}
- public synchronized void registerParentRepairSession(UUID parentRepairSession, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges)
+ public synchronized void registerParentRepairSession(UUID parentRepairSession, InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges)
{
- parentRepairSessions.put(parentRepairSession, new ParentRepairSession(columnFamilyStores, ranges, System.currentTimeMillis()));
+ if (!registeredForEndpointChanges)
+ {
+ Gossiper.instance.register(this);
+ FailureDetector.instance.registerFailureDetectionEventListener(this);
+ registeredForEndpointChanges = true;
+ }
+
+ cleanupOldParentRepairSessions();
+
+ parentRepairSessions.put(parentRepairSession, new ParentRepairSession(coordinator, columnFamilyStores, ranges, System.currentTimeMillis()));
+ }
+
+ /**
+ * Cleans up old failed parent repair sessions - if it is 24h old, we remove it from the map
+ */
+ private void cleanupOldParentRepairSessions()
+ {
+ long currentTime = System.currentTimeMillis();
+
+ Set<UUID> expired = new HashSet<>();
+ for (Map.Entry<UUID, ParentRepairSession> entry : parentRepairSessions.entrySet())
+ {
+ ParentRepairSession session = entry.getValue();
+ if (session.failed && currentTime - session.repairedAt > TimeUnit.HOURS.toMillis(24))
+ expired.add(entry.getKey());
+ }
+ for (UUID remove : expired)
+ parentRepairSessions.remove(remove);
}
public Set<SSTableReader> currentlyRepairing(UUID cfId, UUID parentRepairSession)
@@ -359,7 +394,13 @@ public class ActiveRepairService
public ParentRepairSession getParentRepairSession(UUID parentSessionId)
{
- return parentRepairSessions.get(parentSessionId);
+ ParentRepairSession session = parentRepairSessions.get(parentSessionId);
+ // this can happen if a node thinks that the coordinator was down, but that coordinator got back before noticing
+ // that it was down itself.
+ if (session != null && session.failed)
+ throw new RuntimeException("Parent repair session with id = " + parentSessionId + " has failed.");
+
+ return session;
}
public synchronized ParentRepairSession removeParentRepairSession(UUID parentSessionId)
@@ -427,17 +468,34 @@ public class ActiveRepairService
public final Map<UUID, ColumnFamilyStore> columnFamilyStores = new HashMap<>();
public final Collection<Range<Token>> ranges;
public final Map<UUID, Set<String>> sstableMap = new HashMap<>();
+ /**
+ * used as fail time if failed is true
+ */
public final long repairedAt;
-
- public ParentRepairSession(List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, long repairedAt)
+ public final InetAddress coordinator;
+ /**
+ * Used to mark a repair as failed - if the coordinator thinks that the repair is still ongoing and sends a
+ * request, we need to fail the coordinator as well.
+ */
+ public final boolean failed;
+
+ public ParentRepairSession(InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, long repairedAt, boolean failed)
{
+ this.coordinator = coordinator;
for (ColumnFamilyStore cfs : columnFamilyStores)
{
+
this.columnFamilyStores.put(cfs.metadata.cfId, cfs);
sstableMap.put(cfs.metadata.cfId, new HashSet<String>());
}
this.ranges = ranges;
this.repairedAt = repairedAt;
+ this.failed = failed;
+ }
+
+ public ParentRepairSession(InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, long repairedAt)
+ {
+ this(coordinator, columnFamilyStores, ranges, repairedAt, false);
}
@SuppressWarnings("resource")
@@ -457,6 +515,8 @@ public class ActiveRepairService
private Set<SSTableReader> getActiveSSTables(UUID cfId)
{
+ if (failed)
+ return Collections.emptySet();
Set<String> repairedSSTables = sstableMap.get(cfId);
Set<SSTableReader> activeSSTables = new HashSet<>();
Set<String> activeSSTableNames = new HashSet<>();
@@ -480,6 +540,10 @@ public class ActiveRepairService
}
}
+ public ParentRepairSession asFailed()
+ {
+ return new ParentRepairSession(coordinator, Collections.<ColumnFamilyStore>emptyList(), Collections.<Range<Token>>emptyList(), System.currentTimeMillis(), true);
+ }
@Override
public String toString()
{
@@ -491,4 +555,61 @@ public class ActiveRepairService
'}';
}
}
+
+ /*
+ If the coordinator node dies we should remove the parent repair session from the other nodes.
+ This uses the same notifications as we get in RepairSession
+ */
+ public void onJoin(InetAddress endpoint, EndpointState epState) {}
+ public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
+ public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}
+ public void onAlive(InetAddress endpoint, EndpointState state) {}
+ public void onDead(InetAddress endpoint, EndpointState state) {}
+
+ public void onRemove(InetAddress endpoint)
+ {
+ convict(endpoint, Double.MAX_VALUE);
+ }
+
+ public void onRestart(InetAddress endpoint, EndpointState state)
+ {
+ convict(endpoint, Double.MAX_VALUE);
+ }
+
+ /**
+ * Something has happened to a remote node - if that node is a coordinator, we mark the parent repair session id as failed.
+ *
+ * The fail marker is kept in the map for 24h to make sure that if the coordinator does not agree
+ * that the repair failed, we need to fail the entire repair session
+ *
+ * @param ep endpoint to be convicted
+ * @param phi the value of phi with with ep was convicted
+ */
+ public void convict(InetAddress ep, double phi)
+ {
+ // We want a higher confidence in the failure detection than usual because failing a repair wrongly has a high cost.
+ if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold() || parentRepairSessions.isEmpty())
+ return;
+
+ Set<UUID> toRemove = new HashSet<>();
+
+ for (Map.Entry<UUID, ParentRepairSession> repairSessionEntry : parentRepairSessions.entrySet())
+ {
+ if (repairSessionEntry.getValue().coordinator.equals(ep))
+ {
+ toRemove.add(repairSessionEntry.getKey());
+ }
+ }
+
+ if (!toRemove.isEmpty())
+ {
+ logger.debug("Failing {} in parent repair sessions", toRemove);
+ for (UUID id : toRemove)
+ {
+ ParentRepairSession failed = parentRepairSessions.get(id);
+ parentRepairSessions.replace(id, failed, failed.asFailed());
+ }
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03180469/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 507aedb..eea4556 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -3013,7 +3013,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
{
try
{
- parentSession = ActiveRepairService.instance.prepareForRepair(allNeighbors, ranges, columnFamilyStores);
+ parentSession = ActiveRepairService.instance.prepareForRepair(FBUtilities.getBroadcastAddress(), allNeighbors, ranges, columnFamilyStores);
}
catch (Throwable t)
{
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03180469/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index 749056c..6ec4c7b 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -108,7 +108,7 @@ public class LeveledCompactionStrategyTest extends SchemaLoader
Range<Token> range = new Range<>(Util.token(""), Util.token(""));
int gcBefore = keyspace.getColumnFamilyStore(cfname).gcBefore(System.currentTimeMillis());
UUID parentRepSession = UUID.randomUUID();
- ActiveRepairService.instance.registerParentRepairSession(parentRepSession, Arrays.asList(cfs), Arrays.asList(range));
+ ActiveRepairService.instance.registerParentRepairSession(parentRepSession, FBUtilities.getBroadcastAddress(), Arrays.asList(cfs), Arrays.asList(range));
RepairJobDesc desc = new RepairJobDesc(parentRepSession, UUID.randomUUID(), ksname, cfname, range);
Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), gcBefore);
CompactionManager.instance.submitValidation(cfs, validator).get();
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03180469/test/unit/org/apache/cassandra/repair/DifferencerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/DifferencerTest.java b/test/unit/org/apache/cassandra/repair/DifferencerTest.java
index bc0f0de..3229c58 100644
--- a/test/unit/org/apache/cassandra/repair/DifferencerTest.java
+++ b/test/unit/org/apache/cassandra/repair/DifferencerTest.java
@@ -41,6 +41,7 @@ import org.apache.cassandra.sink.IMessageSink;
import org.apache.cassandra.sink.SinkManager;
import org.apache.cassandra.repair.messages.RepairMessage;
import org.apache.cassandra.repair.messages.SyncComplete;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.MerkleTree;
import static org.junit.Assert.assertEquals;
@@ -109,7 +110,7 @@ public class DifferencerTest extends SchemaLoader
Keyspace keyspace = Keyspace.open("Keyspace1");
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
- ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, Arrays.asList(cfs), Arrays.asList(range));
+ ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, FBUtilities.getBroadcastAddress(), Arrays.asList(cfs), Arrays.asList(range));
RepairJobDesc desc = new RepairJobDesc(parentRepairSession, UUID.randomUUID(), "Keyspace1", "Standard1", range);
http://git-wip-us.apache.org/repos/asf/cassandra/blob/03180469/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
index 419ea1a..26e5126 100644
--- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
+++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
@@ -37,6 +37,7 @@ import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.concurrent.Refs;
import static org.junit.Assert.assertEquals;
@@ -55,7 +56,7 @@ public class ActiveRepairServiceTest extends SchemaLoader
Set<SSTableReader> original = store.getUnrepairedSSTables();
UUID prsId = UUID.randomUUID();
- ActiveRepairService.instance.registerParentRepairSession(prsId, Collections.singletonList(store), null);
+ ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null);
ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(prsId);
//add all sstables to parent repair session