You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@cassandra.apache.org by ma...@apache.org on 2016/06/23 09:29:12 UTC

[2/9] cassandra git commit: Avoid marking too many sstables as repaired

Avoid marking too many sstables as repaired

Patch by marcuse; reviewed by Joel Knighton for CASSANDRA-11696


Project: http://git-wip-us.apache.org/repos/asf/cassandra/repo
Commit: http://git-wip-us.apache.org/repos/asf/cassandra/commit/3c8421a3
Tree: http://git-wip-us.apache.org/repos/asf/cassandra/tree/3c8421a3
Diff: http://git-wip-us.apache.org/repos/asf/cassandra/diff/3c8421a3

Branch: refs/heads/cassandra-3.0
Commit: 3c8421a3304d44c064c230c329a999373feb0607
Parents: 452d626
Author: Marcus Eriksson <ma...@apache.org>
Authored: Fri May 27 09:25:28 2016 +0200
Committer: Marcus Eriksson <ma...@apache.org>
Committed: Thu Jun 23 11:08:24 2016 +0200

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../db/compaction/CompactionManager.java        | 16 ++----
 .../cassandra/service/ActiveRepairService.java  | 59 ++++++++++++++++++--
 .../service/ActiveRepairServiceTest.java        | 57 +++++++++++++++----
 4 files changed, 105 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c8421a3/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 9a3779c..03246ae 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,5 +1,6 @@
 2.1.15
  * Prevent select statements with clustering key > 64k (CASSANDRA-11882)
+ * Avoid marking too many sstables as repaired (CASSANDRA-11696)
  * Fix clock skew corrupting other nodes with paxos (CASSANDRA-11991)
  * Remove distinction between non-existing static columns and existing but null in LWTs (CASSANDRA-9842)
  * Support mlockall on IBM POWER arch (CASSANDRA-11576)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c8421a3/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
index 5af63fe..87819ba 100644
--- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
+++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java
@@ -474,7 +474,7 @@ public class CompactionManager implements CompactionManagerMBean
     /**
      * Make sure the {validatedForRepair} are marked for compaction before calling this.
      *
-     * Caller must reference the validatedForRepair sstables (via ParentRepairSession.getActiveRepairedSSTableRefs(..)).
+     * Caller must reference the validatedForRepair sstables (via ParentRepairSession.getActiveRepairedSSTableRefsForAntiCompaction(..)).
      *
      * @param cfs
      * @param ranges Ranges that the repair was carried out on
@@ -1030,17 +1030,9 @@ public class CompactionManager implements CompactionManagerMBean
                     sstables = cfs.selectAndReference(ColumnFamilyStore.CANONICAL_SSTABLES).refs;
                 else
                 {
-                    ColumnFamilyStore.RefViewFragment refView = cfs.selectAndReference(ColumnFamilyStore.UNREPAIRED_SSTABLES);
-                    sstables = refView.refs;
-                    Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfs.metadata.cfId, validator.desc.parentSessionId);
-
-                    if (!Sets.intersection(currentlyRepairing, Sets.newHashSet(refView.sstables)).isEmpty())
-                    {
-                        logger.error("Cannot start multiple repair sessions over the same sstables");
-                        throw new RuntimeException("Cannot start multiple repair sessions over the same sstables");
-                    }
-
-                    ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId).addSSTables(cfs.metadata.cfId, refView.sstables);
+                    ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(validator.desc.parentSessionId);
+                    prs.markSSTablesRepairing(cfs.metadata.cfId, validator.desc.parentSessionId);
+                    sstables = cfs.selectAndReference(ColumnFamilyStore.UNREPAIRED_SSTABLES).refs;
                 }
 
                 if (validator.gcBefore > 0)

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c8421a3/src/java/org/apache/cassandra/service/ActiveRepairService.java
----------------------------------------------------------------------
diff --git a/src/java/org/apache/cassandra/service/ActiveRepairService.java b/src/java/org/apache/cassandra/service/ActiveRepairService.java
index 4c83c48..bab244d 100644
--- a/src/java/org/apache/cassandra/service/ActiveRepairService.java
+++ b/src/java/org/apache/cassandra/service/ActiveRepairService.java
@@ -425,7 +425,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
         List<ListenableFuture<?>> futures = new ArrayList<>();
         for (Map.Entry<UUID, ColumnFamilyStore> columnFamilyStoreEntry : prs.columnFamilyStores.entrySet())
         {
-            Refs<SSTableReader> sstables = prs.getActiveRepairedSSTableRefs(columnFamilyStoreEntry.getKey());
+            Refs<SSTableReader> sstables = prs.getActiveRepairedSSTableRefsForAntiCompaction(columnFamilyStoreEntry.getKey());
             ColumnFamilyStore cfs = columnFamilyStoreEntry.getValue();
             futures.add(CompactionManager.instance.submitAntiCompaction(cfs, prs.ranges, sstables, prs.repairedAt));
         }
@@ -465,6 +465,18 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
         }
     }
 
+    /**
+     * We keep a ParentRepairSession around for the duration of the entire repair, for example, on a 256 token vnode rf=3 cluster
+     * we would have 768 RepairSession but only one ParentRepairSession. We use the PRS to avoid anticompacting the sstables
+     * 768 times, instead we take all repaired ranges at the end of the repair and anticompact once.
+     *
+     * We do an optimistic marking of sstables - when we start an incremental repair we mark all unrepaired sstables as
+     * repairing (@see markSSTablesRepairing), then while the repair is ongoing compactions might remove those sstables,
+     * and when it is time for anticompaction we will only anticompact the sstables that are still on disk.
+     *
+     * Note that validation and streaming do not care about which sstables we have marked as repairing - they operate on
+     * all unrepaired sstables (if it is incremental), otherwise we would not get a correct repair.
+     */
     public static class ParentRepairSession
     {
         public final Map<UUID, ColumnFamilyStore> columnFamilyStores = new HashMap<>();
@@ -480,13 +492,16 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
          * request, we need to fail the coordinator as well.
          */
         public final boolean failed;
+        /**
+         * Indicates whether we have marked sstables as repairing. Can only be done once per table per ParentRepairSession
+         */
+        private final Set<UUID> marked = new HashSet<>();
 
         public ParentRepairSession(InetAddress coordinator, List<ColumnFamilyStore> columnFamilyStores, Collection<Range<Token>> ranges, long repairedAt, boolean failed)
         {
             this.coordinator = coordinator;
             for (ColumnFamilyStore cfs : columnFamilyStores)
             {
-
                 this.columnFamilyStores.put(cfs.metadata.cfId, cfs);
                 sstableMap.put(cfs.metadata.cfId, new HashSet<String>());
             }
@@ -500,9 +515,18 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
             this(coordinator, columnFamilyStores, ranges, repairedAt, false);
         }
 
+        /**
+         * Gets the repairing sstables for anticompaction.
+         *
+         * Note that validation and streaming uses the real unrepaired sstables.
+         *
+         * @param cfId
+         * @return
+         */
         @SuppressWarnings("resource")
-        public synchronized Refs<SSTableReader> getActiveRepairedSSTableRefs(UUID cfId)
+        public synchronized Refs<SSTableReader> getActiveRepairedSSTableRefsForAntiCompaction(UUID cfId)
         {
+            assert marked.contains(cfId);
             ImmutableMap.Builder<SSTableReader, Ref<SSTableReader>> references = ImmutableMap.builder();
             for (SSTableReader sstable : getActiveSSTables(cfId))
             {
@@ -515,6 +539,30 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
             return new Refs<>(references.build());
         }
 
+        /**
+         * Marks all the unrepaired sstables as repairing unless we have already done so.
+         *
+         * Any of these sstables that are still on disk are then anticompacted once the streaming and validation phases are done.
+         *
+         * @param cfId
+         * @param parentSessionId used to check that we don't start multiple inc repair sessions over the same sstables
+         */
+        public synchronized void markSSTablesRepairing(UUID cfId, UUID parentSessionId)
+        {
+            if (!marked.contains(cfId))
+            {
+                List<SSTableReader> sstables = columnFamilyStores.get(cfId).select(ColumnFamilyStore.UNREPAIRED_SSTABLES).sstables;
+                Set<SSTableReader> currentlyRepairing = ActiveRepairService.instance.currentlyRepairing(cfId, parentSessionId);
+                if (!Sets.intersection(currentlyRepairing, Sets.newHashSet(sstables)).isEmpty())
+                {
+                    logger.error("Cannot start multiple repair sessions over the same sstables");
+                    throw new RuntimeException("Cannot start multiple repair sessions over the same sstables");
+                }
+                addSSTables(cfId, sstables);
+                marked.add(cfId);
+            }
+        }
+
         private Set<SSTableReader> getActiveSSTables(UUID cfId)
         {
             if (failed)
@@ -534,12 +582,10 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
             return activeSSTables;
         }
 
-        public void addSSTables(UUID cfId, Collection<SSTableReader> sstables)
+        private void addSSTables(UUID cfId, Collection<SSTableReader> sstables)
         {
             for (SSTableReader sstable : sstables)
-            {
                 sstableMap.get(cfId).add(sstable.getFilename());
-            }
         }
 
         public ParentRepairSession asFailed()
@@ -556,6 +602,7 @@ public class ActiveRepairService implements IEndpointStateChangeSubscriber, IFai
                     ", repairedAt=" + repairedAt +
                     '}';
         }
+
     }
 
     /*

http://git-wip-us.apache.org/repos/asf/cassandra/blob/3c8421a3/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
----------------------------------------------------------------------
diff --git a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
index 26e5126..cf64322 100644
--- a/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
+++ b/test/unit/org/apache/cassandra/service/ActiveRepairServiceTest.java
@@ -42,6 +42,7 @@ import org.apache.cassandra.utils.concurrent.Refs;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 public class ActiveRepairServiceTest extends SchemaLoader
 {
@@ -58,12 +59,9 @@ public class ActiveRepairServiceTest extends SchemaLoader
         UUID prsId = UUID.randomUUID();
         ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null);
         ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(prsId);
-
-        //add all sstables to parent repair session
-        prs.addSSTables(store.metadata.cfId, original);
-
+        prs.markSSTablesRepairing(store.metadata.cfId, prsId);
         //retrieve all sstable references from parent repair sessions
-        Refs<SSTableReader> refs = prs.getActiveRepairedSSTableRefs(store.metadata.cfId);
+        Refs<SSTableReader> refs = prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId);
         Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator());
         assertEquals(original, retrieved);
         refs.release();
@@ -76,22 +74,62 @@ public class ActiveRepairServiceTest extends SchemaLoader
         store.getDataTracker().replaceWithNewInstances(Collections.singleton(removed), Collections.EMPTY_SET);
 
         //retrieve sstable references from parent repair session again - removed sstable must not be present
-        refs = prs.getActiveRepairedSSTableRefs(store.metadata.cfId);
+        refs = prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId);
         retrieved = Sets.newHashSet(refs.iterator());
         assertEquals(newLiveSet, retrieved);
         assertFalse(retrieved.contains(removed));
         refs.release();
     }
 
+    @Test
+    public void testAddingMoreSSTables()
+    {
+        ColumnFamilyStore store = prepareColumnFamilyStore();
+        Set<SSTableReader> original = store.getUnrepairedSSTables();
+        UUID prsId = UUID.randomUUID();
+        ActiveRepairService.instance.registerParentRepairSession(prsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null);
+        ActiveRepairService.ParentRepairSession prs = ActiveRepairService.instance.getParentRepairSession(prsId);
+        prs.markSSTablesRepairing(store.metadata.cfId, prsId);
+        try (Refs<SSTableReader> refs = prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId))
+        {
+            Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator());
+            assertEquals(original, retrieved);
+        }
+        createSSTables(store, 2);
+        boolean exception = false;
+        try
+        {
+            UUID newPrsId = UUID.randomUUID();
+            ActiveRepairService.instance.registerParentRepairSession(newPrsId, FBUtilities.getBroadcastAddress(), Collections.singletonList(store), null);
+            ActiveRepairService.instance.getParentRepairSession(newPrsId).markSSTablesRepairing(store.metadata.cfId, newPrsId);
+        }
+        catch (Throwable t)
+        {
+            exception = true;
+        }
+        assertTrue(exception);
+
+        try (Refs<SSTableReader> refs = prs.getActiveRepairedSSTableRefsForAntiCompaction(store.metadata.cfId))
+        {
+            Set<SSTableReader> retrieved = Sets.newHashSet(refs.iterator());
+            assertEquals(original, retrieved);
+        }
+    }
+
     private ColumnFamilyStore prepareColumnFamilyStore()
     {
         Keyspace keyspace = Keyspace.open(KEYSPACE1);
         ColumnFamilyStore store = keyspace.getColumnFamilyStore(CF);
         store.truncateBlocking();
         store.disableAutoCompaction();
+        createSSTables(store, 10);
+        return store;
+    }
+
+    private void createSSTables(ColumnFamilyStore cfs, int count)
+    {
         long timestamp = System.currentTimeMillis();
-        //create 10 sstables
-        for (int i = 0; i < 10; i++)
+        for (int i = 0; i < count; i++)
         {
             DecoratedKey key = Util.dk(Integer.toString(i));
             Mutation rm = new Mutation(KEYSPACE1, key.getKey());
@@ -101,8 +139,7 @@ public class ActiveRepairServiceTest extends SchemaLoader
                        timestamp,
                        0);
             rm.apply();
-            store.forceBlockingFlush();
+            cfs.forceBlockingFlush();
         }
-        return store;
     }
 }