You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2016/05/26 06:33:53 UTC

[02/15] cassandra git commit: Fail parent repair session if repair coordinator dies

Fail parent repair session if repair coordinator dies

Patch by marcuse; reviewed by Paulo Motta for CASSANDRA-11824


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/03180469
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/03180469
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/03180469

Branch: refs/heads/cassandra-2.2
Commit: 03180469d62930ef73946458719223c8b9bec245
Parents: d27f9b0
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed May 18 10:44:49 2016 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Thu May 26 07:47:50 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../repair/RepairMessageVerbHandler.java        |   5 +-
 .../cassandra/service/ActiveRepairService.java  | 137 +++++++++++++++++--
 .../cassandra/service/StorageService.java       |   2 +-
 .../LeveledCompactionStrategyTest.java          |   2 +-
 .../cassandra/repair/DifferencerTest.java       |   3 +-
 .../service/ActiveRepairServiceTest.java        |   3 +-
 7 files changed, 139 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/03180469/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 8dfa02a..f73db6e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.15
+ * Clear out parent repair session if repair coordinator dies (CASSANDRA-11824)
  * Set default streaming_socket_timeout_in_ms to 24 hours (CASSANDRA-11840)
  * Do not consider local node a valid source during replace (CASSANDRA-11848)
  * Avoid holding SSTableReaders for duration of incremental repair (CASSANDRA-11739)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03180469/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index fd4ac28..7debc93 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -71,8 +71,9 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
                         columnFamilyStores.add(columnFamilyStore);
                     }
                     ActiveRepairService.instance.registerParentRepairSession(prepareMessage.parentRepairSession,
-                            columnFamilyStores,
-                            prepareMessage.ranges);
+                                                                             message.from,
+                                                                             columnFamilyStores,
+                                                                             prepareMessage.ranges);
                     MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from);
                     break;
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03180469/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 5297ce3..f8975f9 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -35,12 +35,18 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.concurrent.JMXConfigurableThreadPoolExecutor;
 import org.apache.cassandra.concurrent.NamedThreadFactory;
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.db.ColumnFamilyStore;
 import org.apache.cassandra.db.compaction.CompactionManager;
 import org.apache.cassandra.dht.Range;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.gms.Gossiper;
+import org.apache.cassandra.gms.IEndpointStateChangeSubscriber;
+import org.apache.cassandra.gms.IFailureDetectionEventListener;
+import org.apache.cassandra.gms.VersionedValue;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.net.IAsyncCallbackWithFailure;
@@ -72,7 +78,7 @@ import org.apache.cassandra.utils.concurrent.Refs;
  * The creation of a repair session is done through the submitRepairSession that
  * returns a future on the completion of that session.
  */
-public class ActiveRepairService
+public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFailureDetectionEventListener
 {
     private static final Logger logger = LoggerFactory.getLogger(ActiveRepairService.class);
     // singleton enforcement
@@ -81,6 +87,8 @@ public class ActiveRepairService
     public static final long UNREPAIRED_SSTABLE = 0;
 
     private static final ThreadPoolExecutor executor;
+    private boolean registeredForEndpointChanges = false;
+
     static
     {
         executor = new JMXConfigurableThreadPoolExecutor(4,
@@ -244,10 +252,10 @@ public class ActiveRepairService
         return neighbors;
     }
 
-    public synchronized UUID prepareForRepair(Set<InetAddress> endpoints, Collection<Range<Token>> ranges, List<ColumnFamilyStore> columnFamilyStores)
+    public synchronized UUID prepareForRepair(InetAddress coordinator, Set<InetAddress> endpoints, Collection<Range<Token>> ranges, List<ColumnFamilyStore> columnFamilyStores)
     {
         UUID parentRepairSession = UUIDGen.getTimeUUID();
-        registerParentRepairSession(parentRepairSession, columnFamilyStores, ranges);
+        registerParentRepairSession(parentRepairSession, coordinator, columnFamilyStores, ranges);
         final CountDownLatch prepareLatch = new CountDownLatch(endpoints.size());
         final AtomicBoolean status = new AtomicBoolean(true);
         final Set<String> failedNodes = Collections.synchronizedSet(new HashSet<String>());
@@ -309,9 +317,36 @@ public class ActiveRepairService
         return parentRepairSession;
     }
 
-    public synchronized void registerParentRepairSession(UUID parentRepairSession, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges)
+    public synchronized void registerParentRepairSession(UUID parentRepairSession, InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges)
     {
-        parentRepairSessions.put(parentRepairSession, new ParentRepairSession(columnFamilyStores, ranges, System.currentTimeMillis()));
+        if (!registeredForEndpointChanges)
+        {
+            Gossiper.instance.register(this);
+            FailureDetector.instance.registerFailureDetectionEventListener(this);
+            registeredForEndpointChanges = true;
+        }
+
+        cleanupOldParentRepairSessions();
+
+        parentRepairSessions.put(parentRepairSession, new ParentRepairSession(coordinator, columnFamilyStores, ranges, System.currentTimeMillis()));
+    }
+
+    /**
+     * Cleans up old failed parent repair sessions - if it is 24h old, we remove it from the map
+     */
+    private void cleanupOldParentRepairSessions()
+    {
+        long currentTime = System.currentTimeMillis();
+
+        Set<UUID> expired = new HashSet<>();
+        for (Map.Entry<UUID, ParentRepairSession> entry : parentRepairSessions.entrySet())
+        {
+            ParentRepairSession session = entry.getValue();
+            if (session.failed && currentTime - session.repairedAt > TimeUnit.HOURS.toMillis(24))
+                expired.add(entry.getKey());
+        }
+        for (UUID remove : expired)
+            parentRepairSessions.remove(remove);
     }
 
     public Set<SSTableReader> currentlyRepairing(UUID cfId, UUID parentRepairSession)
@@ -359,7 +394,13 @@ public class ActiveRepairService
 
     public ParentRepairSession getParentRepairSession(UUID parentSessionId)
     {
-        return parentRepairSessions.get(parentSessionId);
+        ParentRepairSession session = parentRepairSessions.get(parentSessionId);
+        // this can happen if a node thinks that the coordinator was down, but that coordinator got back before noticing
+        // that it was down itself.
+        if (session != null && session.failed)
+            throw new RuntimeException("Parent repair session with id = " + parentSessionId + " has failed.");
+
+        return session;
     }
 
     public synchronized ParentRepairSession removeParentRepairSession(UUID parentSessionId)
@@ -427,17 +468,34 @@ public class ActiveRepairService
         public final Map<UUID, ColumnFamilyStore> columnFamilyStores = new HashMap<>();
         public final Collection<Range<Token>> ranges;
         public final Map<UUID, Set<String>> sstableMap = new HashMap<>();
+        /**
+         * used as fail time if failed is true
+         */
         public final long repairedAt;
-
-        public ParentRepairSession(List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, long repairedAt)
+        public final InetAddress coordinator;
+        /**
+         * Used to mark a repair as failed - if the coordinator thinks that the repair is still ongoing and sends a
+         * request, we need to fail the coordinator as well.
+         */
+        public final boolean failed;
+
+        public ParentRepairSession(InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, long repairedAt, boolean failed)
         {
+            this.coordinator = coordinator;
             for (ColumnFamilyStore cfs : columnFamilyStores)
             {
+
                 this.columnFamilyStores.put(cfs.metadata.cfId, cfs);
                 sstableMap.put(cfs.metadata.cfId, new HashSet<String>());
             }
             this.ranges = ranges;
             this.repairedAt = repairedAt;
+            this.failed = failed;
+        }
+
+        public ParentRepairSession(InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, long repairedAt)
+        {
+            this(coordinator, columnFamilyStores, ranges, repairedAt, false);
         }
 
         @SuppressWarnings("resource")
@@ -457,6 +515,8 @@ public class ActiveRepairService
 
         private Set<SSTableReader> getActiveSSTables(UUID cfId)
         {
+            if (failed)
+                return Collections.emptySet();
             Set<String> repairedSSTables = sstableMap.get(cfId);
             Set<SSTableReader> activeSSTables = new HashSet<>();
             Set<String> activeSSTableNames = new HashSet<>();
@@ -480,6 +540,10 @@ public class ActiveRepairService
             }
         }
 
+        public ParentRepairSession asFailed()
+        {
+            return new ParentRepairSession(coordinator, Collections.<ColumnFamilyStore>emptyList(), Collections.<Range<Token>>emptyList(), System.currentTimeMillis(), true);
+        }
         @Override
         public String toString()
         {
@@ -491,4 +555,61 @@ public class ActiveRepairService
                     '}';
         }
     }
+
+    /*
+    If the coordinator node dies we should remove the parent repair session from the other nodes.
+    This uses the same notifications as we get in RepairSession
+     */
+    public void onJoin(InetAddress endpoint, EndpointState epState) {}
+    public void beforeChange(InetAddress endpoint, EndpointState currentState, ApplicationState newStateKey, VersionedValue newValue) {}
+    public void onChange(InetAddress endpoint, ApplicationState state, VersionedValue value) {}
+    public void onAlive(InetAddress endpoint, EndpointState state) {}
+    public void onDead(InetAddress endpoint, EndpointState state) {}
+
+    public void onRemove(InetAddress endpoint)
+    {
+        convict(endpoint, Double.MAX_VALUE);
+    }
+
+    public void onRestart(InetAddress endpoint, EndpointState state)
+    {
+        convict(endpoint, Double.MAX_VALUE);
+    }
+
+    /**
+     * Something has happened to a remote node - if that node is a coordinator, we mark the parent repair session id as failed.
+     *
+     * The fail marker is kept in the map for 24h to make sure that if the coordinator does not agree
+     * that the repair failed, we need to fail the entire repair session
+     *
+     * @param ep  endpoint to be convicted
+     * @param phi the value of phi with with ep was convicted
+     */
+    public void convict(InetAddress ep, double phi)
+    {
+        // We want a higher confidence in the failure detection than usual because failing a repair wrongly has a high cost.
+        if (phi < 2 * DatabaseDescriptor.getPhiConvictThreshold() || parentRepairSessions.isEmpty())
+            return;
+
+        Set<UUID> toRemove = new HashSet<>();
+
+        for (Map.Entry<UUID, ParentRepairSession> repairSessionEntry : parentRepairSessions.entrySet())
+        {
+            if (repairSessionEntry.getValue().coordinator.equals(ep))
+            {
+                toRemove.add(repairSessionEntry.getKey());
+            }
+        }
+
+        if (!toRemove.isEmpty())
+        {
+            logger.debug("Failing {} in parent repair sessions", toRemove);
+            for (UUID id : toRemove)
+            {
+                ParentRepairSession failed = parentRepairSessions.get(id);
+                parentRepairSessions.replace(id, failed, failed.asFailed());
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03180469/src/java/org/apache/cassandra/service/StorageService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java
index 507aedb..eea4556 100644
--- a/src/java/org/apache/cassandra/service/StorageService.java
+++ b/src/java/org/apache/cassandra/service/StorageService.java
@@ -3013,7 +3013,7 @@ public class StorageService extends NotificationBroadcasterSupport implements IE
                 {
                     try
                     {
-                        parentSession = ActiveRepairService.instance.prepareForRepair(allNeighbors, ranges, columnFamilyStores);
+                        parentSession = ActiveRepairService.instance.prepareForRepair(FBUtilities.getBroadcastAddress(), allNeighbors, ranges, columnFamilyStores);
                     }
                     catch (Throwable t)
                     {

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03180469/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
index 749056c..6ec4c7b 100644
--- a/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
+++ b/test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
@@ -108,7 +108,7 @@ public class LeveledCompactionStrategyTest extends SchemaLoader
         Range<Token> range = new Range<>(Util.token(""), Util.token(""));
         int gcBefore = keyspace.getColumnFamilyStore(cfname).gcBefore(System.currentTimeMillis());
         UUID parentRepSession = UUID.randomUUID();
-        ActiveRepairService.instance.registerParentRepairSession(parentRepSession, Arrays.asList(cfs), Arrays.asList(range));
+        ActiveRepairService.instance.registerParentRepairSession(parentRepSession, FBUtilities.getBroadcastAddress(), Arrays.asList(cfs), Arrays.asList(range));
         RepairJobDesc desc = new RepairJobDesc(parentRepSession, UUID.randomUUID(), ksname, cfname, range);
         Validator validator = new Validator(desc, FBUtilities.getBroadcastAddress(), gcBefore);
         CompactionManager.instance.submitValidation(cfs, validator).get();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03180469/test/unit/org/apache/cassandra/repair/DifferencerTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/repair/DifferencerTest.java b/test/unit/org/apache/cassandra/repair/DifferencerTest.java
index bc0f0de..3229c58 100644
--- a/test/unit/org/apache/cassandra/repair/DifferencerTest.java
+++ b/test/unit/org/apache/cassandra/repair/DifferencerTest.java
@@ -41,6 +41,7 @@ import org.apache.cassandra.sink.IMessageSink;
 import org.apache.cassandra.sink.SinkManager;
 import org.apache.cassandra.repair.messages.RepairMessage;
 import org.apache.cassandra.repair.messages.SyncComplete;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MerkleTree;
 
 import static org.junit.Assert.assertEquals;
@@ -109,7 +110,7 @@ public class DifferencerTest extends SchemaLoader
         Keyspace keyspace = Keyspace.open("Keyspace1");
         ColumnFamilyStore cfs = keyspace.getColumnFamilyStore("Standard1");
 
-        ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, Arrays.asList(cfs), Arrays.asList(range));
+        ActiveRepairService.instance.registerParentRepairSession(parentRepairSession, FBUtilities.getBroadcastAddress(), Arrays.asList(cfs), Arrays.asList(range));
 
         RepairJobDesc desc = new RepairJobDesc(parentRepairSession, UUID.randomUUID(), "Keyspace1", "Standard1", range);
 

http://git-wip-us.apache.org/repos/asf/cassandra/blob/03180469/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
index 419ea1a..26e5126 100644
--- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
+++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
@@ -37,6 +37,7 @@ import org.apache.cassandra.db.Keyspace;
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.io.sstable.SSTableReader;
 import org.apache.cassandra.utils.ByteBufferUtil;
+import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.Refs;
 
 import static org.junit.Assert.assertEquals;
@@ -55,7 +56,7 @@ public class ActiveRepairServiceTest extends SchemaLoader
         Set<SSTableReader> original = store.getUnrepairedSSTables();
 
         UUID prsId = UUID.randomUUID();
-        ActiveRepairService.instance.registerParentRepairSession(prsId, Collections.singletonList(store), null);
+        ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null);
         ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(prsId);
 
         //add all sstables to parent repair session