You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2015/01/21 10:38:19 UTC

cassandra git commit: Don't allow starting several repairs on the same sstables

Repository: cassandra
Updated Branches:
  refs/heads/cassandra-2.1 caa4f3d34 -> 6d9d175a4


Don't allow starting several repairs on the same sstables

Patch by marcuse; reviewed by yukim for CASSANDRA-8316


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/6d9d175a
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/6d9d175a
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/6d9d175a

Branch: refs/heads/cassandra-2.1
Commit: 6d9d175a44a6beb7be17ffebcbee45112d48eeea
Parents: caa4f3d
Author: Marcus Eriksson <ma...@apache.org>
Authored: Wed Dec 3 13:25:00 2014 +0100
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Wed Jan 21 08:41:29 2015 +0100

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../db/compaction/CompactionManager.java        |  26 +++-
 .../repair/RepairMessageVerbHandler.java        | 134 ++++++++++---------
 .../cassandra/service/ActiveRepairService.java  |  51 ++++---
 .../cassandra/streaming/StreamSession.java      |   1 -
 5 files changed, 129 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d9d175a/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 296aa66..ea2ecc0 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 2.1.3
+ * Don't allow starting multiple inc repairs on the same sstables (CASSANDRA-8316)
  * Invalidate prepared BATCH statements when related tables
    or keyspaces are dropped (CASSANDRA-8652)
  * Fix missing results in secondary index queries on collections

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d9d175a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index eb7c0ee..02f5e81 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -21,6 +21,7 @@ import java.io.File;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -369,15 +370,32 @@ public class CompactionManager implements CompactionManagerMBean
 
     public Future<?> submitAntiCompaction(final ColumnFamilyStore cfs,
                                           final Collection<Range<Token>> ranges,
-                                          final Collection<SSTableReader> validatedForRepair,
+                                          final Collection<SSTableReader> sstables,
                                           final long repairedAt)
     {
         Runnable runnable = new WrappedRunnable() {
-
             @Override
             public void runMayThrow() throws Exception
             {
-                performAnticompaction(cfs, ranges, validatedForRepair, repairedAt);
+                boolean success = false;
+                while (!success)
+                {
+                    for (SSTableReader compactingSSTable : cfs.getDataTracker().getCompacting())
+                    {
+                        if (sstables.remove(compactingSSTable))
+                            SSTableReader.releaseReferences(Arrays.asList(compactingSSTable));
+                    }
+                    Set<SSTableReader> compactedSSTables = new HashSet<>();
+                    for (SSTableReader sstable : sstables)
+                    {
+                        if (sstable.isMarkedCompacted())
+                            compactedSSTables.add(sstable);
+                    }
+                    sstables.removeAll(compactedSSTables);
+                    SSTableReader.releaseReferences(compactedSSTables);
+                    success = sstables.isEmpty() || cfs.getDataTracker().markCompacting(sstables);
+                }
+                performAnticompaction(cfs, ranges, sstables, repairedAt);
             }
         };
         return executor.submit(runnable);
@@ -398,7 +416,7 @@ public class CompactionManager implements CompactionManagerMBean
                                       Collection<SSTableReader> validatedForRepair,
                                       long repairedAt) throws InterruptedException, ExecutionException, IOException
     {
-        logger.info("Starting anticompaction");
+        logger.info("Starting anticompaction for {}.{} on {}/{} sstables", cfs.keyspace.getName(), cfs.getColumnFamilyName(), validatedForRepair.size(), cfs.getSSTables().size());
         logger.debug("Starting anticompaction for ranges {}", ranges);
         Set<SSTableReader> sstables = new HashSet<>(validatedForRepair);
         Set<SSTableReader> mutatedRepairStatuses = new HashSet<>();

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d9d175a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
index 141e4c7..c7cf4c8 100644
--- a/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
+++ b/src/java/org/apache/cassandra/repair/RepairMessageVerbHandler.java
@@ -57,78 +57,88 @@ public class RepairMessageVerbHandler implements IVerbHandler<RepairMessage>
     {
         // TODO add cancel/interrupt message
         RepairJobDesc desc = message.payload.desc;
-        switch (message.payload.messageType)
+        try
         {
-            case PREPARE_MESSAGE:
-                PrepareMessage prepareMessage = (PrepareMessage) message.payload;
-                List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>(prepareMessage.cfIds.size());
-                for (UUID cfId : prepareMessage.cfIds)
-                {
-                    Pair<String, String> kscf = Schema.instance.getCF(cfId);
-                    ColumnFamilyStore columnFamilyStore = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
-                    columnFamilyStores.add(columnFamilyStore);
-                }
-                ActiveRepairService.instance.registerParentRepairSession(prepareMessage.parentRepairSession,
-                                                                         columnFamilyStores,
-                                                                         prepareMessage.ranges);
-                MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from);
-                break;
-
-            case SNAPSHOT:
-                ColumnFamilyStore cfs = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily);
-                final Range<Token> repairingRange = desc.range;
-                cfs.snapshot(desc.sessionId.toString(), new Predicate<SSTableReader>()
-                {
-                    public boolean apply(SSTableReader sstable)
+            switch (message.payload.messageType)
+            {
+                case PREPARE_MESSAGE:
+                    PrepareMessage prepareMessage = (PrepareMessage) message.payload;
+                    List<ColumnFamilyStore> columnFamilyStores = new ArrayList<>(prepareMessage.cfIds.size());
+                    for (UUID cfId : prepareMessage.cfIds)
                     {
-                        return sstable != null &&
-                               !(sstable.partitioner instanceof LocalPartitioner) && // exclude SSTables from 2i
-                               new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singleton(repairingRange));
+                        Pair<String, String> kscf = Schema.instance.getCF(cfId);
+                        ColumnFamilyStore columnFamilyStore = Keyspace.open(kscf.left).getColumnFamilyStore(kscf.right);
+                        columnFamilyStores.add(columnFamilyStore);
                     }
-                });
+                    ActiveRepairService.instance.registerParentRepairSession(prepareMessage.parentRepairSession,
+                            columnFamilyStores,
+                            prepareMessage.ranges);
+                    MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from);
+                    break;
 
-                logger.debug("Enqueuing response to snapshot request {} to {}", desc.sessionId, message.from);
-                MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from);
-                break;
+                case SNAPSHOT:
+                    ColumnFamilyStore cfs = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily);
+                    final Range<Token> repairingRange = desc.range;
+                    cfs.snapshot(desc.sessionId.toString(), new Predicate<SSTableReader>()
+                    {
+                        public boolean apply(SSTableReader sstable)
+                        {
+                            return sstable != null &&
+                                    !(sstable.partitioner instanceof LocalPartitioner) && // exclude SSTables from 2i
+                                    new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singleton(repairingRange));
+                        }
+                    });
 
-            case VALIDATION_REQUEST:
-                ValidationRequest validationRequest = (ValidationRequest) message.payload;
-                // trigger read-only compaction
-                ColumnFamilyStore store = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily);
+                    logger.debug("Enqueuing response to snapshot request {} to {}", desc.sessionId, message.from);
+                    MessagingService.instance().sendReply(new MessageOut(MessagingService.Verb.INTERNAL_RESPONSE), id, message.from);
+                    break;
 
-                Validator validator = new Validator(desc, message.from, validationRequest.gcBefore);
-                CompactionManager.instance.submitValidation(store, validator);
-                break;
+                case VALIDATION_REQUEST:
+                    ValidationRequest validationRequest = (ValidationRequest) message.payload;
+                    // trigger read-only compaction
+                    ColumnFamilyStore store = Keyspace.open(desc.keyspace).getColumnFamilyStore(desc.columnFamily);
 
-            case SYNC_REQUEST:
-                // forwarded sync request
-                SyncRequest request = (SyncRequest) message.payload;
-                StreamingRepairTask task = new StreamingRepairTask(desc, request);
-                task.run();
-                break;
+                    Validator validator = new Validator(desc, message.from, validationRequest.gcBefore);
+                    CompactionManager.instance.submitValidation(store, validator);
+                    break;
 
-            case ANTICOMPACTION_REQUEST:
-                logger.debug("Got anticompaction request");
-                AnticompactionRequest anticompactionRequest = (AnticompactionRequest) message.payload;
-                try
-                {
-                    List<Future<?>> futures = ActiveRepairService.instance.doAntiCompaction(anticompactionRequest.parentRepairSession);
-                    FBUtilities.waitOnFutures(futures);
-                }
-                catch (Exception e)
-                {
-                    throw new RuntimeException(e);
-                }
-                finally
-                {
-                    ActiveRepairService.instance.removeParentRepairSession(anticompactionRequest.parentRepairSession);
-                }
+                case SYNC_REQUEST:
+                    // forwarded sync request
+                    SyncRequest request = (SyncRequest) message.payload;
+                    StreamingRepairTask task = new StreamingRepairTask(desc, request);
+                    task.run();
+                    break;
 
-                break;
+                case ANTICOMPACTION_REQUEST:
+                    logger.debug("Got anticompaction request");
+                    AnticompactionRequest anticompactionRequest = (AnticompactionRequest) message.payload;
+                    try
+                    {
+                        List<Future<?>> futures = ActiveRepairService.instance.doAntiCompaction(anticompactionRequest.parentRepairSession);
+                        FBUtilities.waitOnFutures(futures);
+                    }
+                    catch (Exception e)
+                    {
+                        throw new RuntimeException(e);
+                    }
+                    finally
+                    {
+                        ActiveRepairService.instance.removeParentRepairSession(anticompactionRequest.parentRepairSession);
+                    }
+
+                    break;
 
-            default:
-                ActiveRepairService.instance.handleMessage(message.from, message.payload);
-                break;
+                default:
+                    ActiveRepairService.instance.handleMessage(message.from, message.payload);
+                    break;
+            }
+        }
+        catch (Exception e)
+        {
+            logger.error("Got error, removing parent repair session");
+            if (desc!=null && desc.parentSessionId != null)
+                ActiveRepairService.instance.removeParentRepairSession(desc.parentSessionId);
+            throw new RuntimeException(e);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d9d175a/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 17cf6ef..36f7c5c 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -139,7 +139,7 @@ public class ActiveRepairService
         sessions.remove(session.getId());
     }
 
-    public void terminateSessions()
+    public synchronized void terminateSessions()
     {
         for (RepairSession session : sessions.values())
         {
@@ -241,7 +241,7 @@ public class ActiveRepairService
         return neighbors;
     }
 
-    public UUID prepareForRepair(Set<InetAddress> endpoints, Collection<Range<Token>> ranges, List<ColumnFamilyStore> columnFamilyStores)
+    public synchronized UUID prepareForRepair(Set<InetAddress> endpoints, Collection<Range<Token>> ranges, List<ColumnFamilyStore> columnFamilyStores)
     {
         UUID parentRepairSession = UUIDGen.getTimeUUID();
         registerParentRepairSession(parentRepairSession, columnFamilyStores, ranges);
@@ -297,18 +297,24 @@ public class ActiveRepairService
         return parentRepairSession;
     }
 
-    public void registerParentRepairSession(UUID parentRepairSession, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges)
+    public synchronized void registerParentRepairSession(UUID parentRepairSession, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges)
     {
         Map<UUID, Set<SSTableReader>> sstablesToRepair = new HashMap<>();
         for (ColumnFamilyStore cfs : columnFamilyStores)
         {
             Set<SSTableReader> sstables = new HashSet<>();
+            Set<SSTableReader> currentlyRepairing = currentlyRepairing(cfs.metadata.cfId);
             for (SSTableReader sstable : cfs.getSSTables())
             {
                 if (new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(ranges))
                 {
                     if (!sstable.isRepaired())
                     {
+                        if (currentlyRepairing.contains(sstable))
+                        {
+                            logger.error("Already repairing "+sstable+", can not continue.");
+                            throw new RuntimeException("Already repairing "+sstable+", can not continue.");
+                        }
                         sstables.add(sstable);
                     }
                 }
@@ -318,7 +324,19 @@ public class ActiveRepairService
         parentRepairSessions.put(parentRepairSession, new ParentRepairSession(columnFamilyStores, ranges, sstablesToRepair, System.currentTimeMillis()));
     }
 
-    public void finishParentSession(UUID parentSession, Set<InetAddress> neighbors, boolean doAntiCompaction) throws InterruptedException, ExecutionException, IOException
+    private Set<SSTableReader> currentlyRepairing(UUID cfId)
+    {
+        Set<SSTableReader> repairing = new HashSet<>();
+        for (Map.Entry<UUID, ParentRepairSession> entry : parentRepairSessions.entrySet())
+        {
+            Collection<SSTableReader> sstables = entry.getValue().sstableMap.get(cfId);
+            if (sstables != null)
+                repairing.addAll(sstables);
+        }
+        return repairing;
+    }
+
+    public synchronized void finishParentSession(UUID parentSession, Set<InetAddress> neighbors, boolean doAntiCompaction) throws InterruptedException, ExecutionException, IOException
     {
         try
         {
@@ -345,7 +363,7 @@ public class ActiveRepairService
         return parentRepairSessions.get(parentSessionId);
     }
 
-    public ParentRepairSession removeParentRepairSession(UUID parentSessionId)
+    public synchronized ParentRepairSession removeParentRepairSession(UUID parentSessionId)
     {
         return parentRepairSessions.remove(parentSessionId);
     }
@@ -358,20 +376,8 @@ public class ActiveRepairService
         List<Future<?>> futures = new ArrayList<>();
         for (Map.Entry<UUID, ColumnFamilyStore> columnFamilyStoreEntry : prs.columnFamilyStores.entrySet())
         {
-
             Collection<SSTableReader> sstables = new HashSet<>(prs.getAndReferenceSSTables(columnFamilyStoreEntry.getKey()));
             ColumnFamilyStore cfs = columnFamilyStoreEntry.getValue();
-            boolean success = false;
-            while (!success)
-            {
-                for (SSTableReader compactingSSTable : cfs.getDataTracker().getCompacting())
-                {
-                    if (sstables.remove(compactingSSTable))
-                        SSTableReader.releaseReferences(Arrays.asList(compactingSSTable));
-                }
-                success = sstables.isEmpty() || cfs.getDataTracker().markCompacting(sstables);
-            }
-
             futures.add(CompactionManager.instance.submitAntiCompaction(cfs, prs.ranges, sstables, prs.repairedAt));
         }
 
@@ -435,5 +441,16 @@ public class ActiveRepairService
             }
             return sstables;
         }
+
+        @Override
+        public String toString()
+        {
+            return "ParentRepairSession{" +
+                    "columnFamilyStores=" + columnFamilyStores +
+                    ", ranges=" + ranges +
+                    ", sstableMap=" + sstableMap +
+                    ", repairedAt=" + repairedAt +
+                    '}';
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/cassandra/blob/6d9d175a/src/java/org/apache/cassandra/streaming/StreamSession.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/streaming/StreamSession.java b/src/java/org/apache/cassandra/streaming/StreamSession.java
index b03d28a..5617b04 100644
--- a/src/java/org/apache/cassandra/streaming/StreamSession.java
+++ b/src/java/org/apache/cassandra/streaming/StreamSession.java
@@ -300,7 +300,6 @@ public class StreamSession implements IEndpointStateChangeSubscriber
                 ColumnFamilyStore.ViewFragment view = cfStore.selectAndReference(cfStore.viewFilter(rowBoundsList));
                 sstables.addAll(view.sstables);
             }
-
             List<SSTableStreamingSections> sections = new ArrayList<>(sstables.size());
             for (SSTableReader sstable : sstables)
             {